| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from __future__ import absolute_import, division, print_function |
| from collections import defaultdict, namedtuple |
| import datetime |
| import json |
| import logging |
| import os |
| import random |
| import re |
| from subprocess import check_call, check_output |
| import time |
| |
| from avro.datafile import DataFileReader |
| from avro.io import DatumReader |
| from builtins import range |
| import pytest |
| import pytz |
| |
| # noinspection PyUnresolvedReferences |
| from impala_thrift_gen.parquet.ttypes import ConvertedType |
| from tests.common.file_utils import ( |
| create_iceberg_table_from_directory, |
| create_table_from_parquet, |
| ) |
| from tests.common.iceberg_test_suite import IcebergTestSuite |
| from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION |
| from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfFS |
| from tests.common.test_dimensions import add_exec_option_dimension |
| from tests.common.test_result_verifier import error_msg_startswith |
| from tests.shell.util import run_impala_shell_cmd |
| from tests.util.filesystem_utils import FILESYSTEM_PREFIX, get_fs_path, IS_HDFS, WAREHOUSE |
| from tests.util.get_parquet_metadata import get_parquet_metadata |
| from tests.util.iceberg_util import cast_ts, get_snapshots, IcebergCatalogs, quote |
| from tests.util.parse_util import bytes_to_str |
| |
| LOG = logging.getLogger(__name__) |
| |
| |
| class TestIcebergTable(IcebergTestSuite): |
| """Tests related to Iceberg tables.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestIcebergTable, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('table_format').file_format == 'parquet') |
| |
| def test_iceberg_negative(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-negative', vector, use_db=unique_database) |
| |
| def test_create_iceberg_tables(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-create', vector, use_db=unique_database) |
| |
| def test_alter_iceberg_tables_v1(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-alter-v1', vector, use_db=unique_database) |
| |
| def test_alter_iceberg_tables_v2(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-alter-v2', vector, use_db=unique_database) |
| |
| def test_alter_iceberg_tables_default(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-alter-default', vector, use_db=unique_database) |
| |
| def test_iceberg_binary_type(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-binary-type', vector, use_db=unique_database) |
| |
| def test_external_iceberg_tables(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-external', vector, unique_database) |
| |
| def test_expire_snapshots(self, unique_database): |
| tbl_name = unique_database + ".expire_snapshots" |
| iceberg_catalogs = IcebergCatalogs(unique_database) |
| for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties(): |
| # We are setting the TIMEZONE query option in this test, so let's create a local |
| # impala client. |
| with self.create_impala_client() as impalad_client: |
| # Iceberg doesn't create a snapshot entry for the initial empty table |
| impalad_client.execute(""" |
| create table {0} (i int) stored as iceberg |
| TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties)) |
| ts_0 = datetime.datetime.now() |
| insert_q = "insert into {0} values (1)".format(tbl_name) |
| ts_1 = self.execute_query_ts(impalad_client, insert_q) |
| time.sleep(1) |
| impalad_client.execute(insert_q) |
| time.sleep(1) |
| ts_2 = self.execute_query_ts(impalad_client, insert_q) |
| impalad_client.execute(insert_q) |
| |
| # There should be 4 snapshots initially |
| self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4) |
| # Expire the oldest snapshot and test that the oldest one was expired |
| expire_q = "alter table {0} execute expire_snapshots({1})" |
| impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) |
| self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) |
| self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3) |
| |
| # Expire with a timestamp in which the interval does not touch existing snapshot |
| impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) |
| self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) |
| |
| # Expire all, but retain 1 |
| impalad_client.execute(expire_q.format(tbl_name, |
| cast_ts(datetime.datetime.now()))) |
| self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1) |
| |
| # Change number of retained snapshots, then expire all |
| impalad_client.execute("""alter table {0} set tblproperties |
| ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name)) |
| impalad_client.execute(insert_q) |
| impalad_client.execute(insert_q) |
| impalad_client.execute(expire_q.format(tbl_name, |
| cast_ts(datetime.datetime.now()))) |
| self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2) |
| |
| # Check that timezone is interpreted in local timezone controlled by query option |
| # TIMEZONE. |
| impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") |
| impalad_client.execute(insert_q) |
| ts_tokyo = self.impala_now(impalad_client) |
| impalad_client.execute("SET TIMEZONE='Europe/Budapest'") |
| impalad_client.execute(insert_q) |
| impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") |
| impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo))) |
| self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1) |
| impalad_client.execute("DROP TABLE {0}".format(tbl_name)) |
| |
| def test_truncate_iceberg_tables(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database) |
| |
| @SkipIf.not_dfs |
| def test_drop_incomplete_table(self, unique_database): |
| """Test DROP TABLE when the underlying directory is deleted. In that case table |
| loading fails, but we should be still able to drop the table from Impala.""" |
| tbl_name = unique_database + ".synchronized_iceberg_tbl" |
| cat_location = get_fs_path("/test-warehouse/" + unique_database) |
| self.client.execute("""create table {0} (i int) stored as iceberg |
| tblproperties('iceberg.catalog'='hadoop.catalog', |
| 'iceberg.catalog_location'='{1}')""".format(tbl_name, cat_location)) |
| self.filesystem_client.delete_file_dir(cat_location, True) |
| self.execute_query_expect_success(self.client, """drop table {0}""".format(tbl_name)) |
| |
| @SkipIf.not_dfs(reason="Dfs required as test to directly delete files.") |
| def test_drop_corrupt_table(self, unique_database): |
| """Test that if the underlying iceberg metadata directory is deleted, then a query |
| fails with a reasonable error message, and the table can be dropped successfully.""" |
| table = "corrupt_iceberg_tbl" |
| full_table_name = unique_database + "." + table |
| self.client.execute("""create table {0} (i int) stored as iceberg""". |
| format(full_table_name)) |
| metadata_location = get_fs_path("""/test-warehouse/{0}.db/{1}/metadata""".format( |
| unique_database, table)) |
| assert self.filesystem_client.exists(metadata_location) |
| status = self.filesystem_client.delete_file_dir(metadata_location, True) |
| assert status, "Delete failed with {0}".format(status) |
| assert not self.filesystem_client.exists(metadata_location) |
| |
| # Invalidate so that table loading problems will happen in the catalog. |
| self.client.execute("invalidate metadata {0}".format(full_table_name)) |
| |
| # Query should now fail. |
| err = self.execute_query_expect_failure(self.client, """select * from {0}""". |
| format(full_table_name)) |
| result = str(err) |
| assert "AnalysisException: Failed to load metadata for table" in result |
| assert ("Failed to load metadata for table" in result # local catalog |
| or "Error loading metadata for Iceberg table" in result) # default catalog |
| self.execute_query_expect_success(self.client, """drop table {0}""". |
| format(full_table_name)) |
| |
| def test_insert(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-insert', vector, use_db=unique_database) |
| |
| def test_partitioned_insert_v1(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-partitioned-insert-v1', vector, |
| use_db=unique_database) |
| |
| def test_partitioned_insert_v2(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-partitioned-insert-v2', vector, |
| use_db=unique_database) |
| |
| def test_partitioned_insert_default(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-partitioned-insert-default', vector, |
| use_db=unique_database) |
| |
| def test_insert_overwrite(self, vector, unique_database): |
| """Run iceberg-overwrite tests, then test that INSERT INTO/OVERWRITE queries running |
| concurrently with a long running INSERT OVERWRITE are handled gracefully. query_a is |
| started before query_b/query_c, but query_b/query_c are supposed to finish before |
| query_a. query_a should fail because the overwrite should not erase query_b/query_c's |
| result.""" |
| # Run iceberg-overwrite.test |
| self.run_test_case('QueryTest/iceberg-overwrite', vector, use_db=unique_database) |
| |
| # Create test dataset for concurrency tests and warm-up the test table |
| tbl_name = unique_database + ".overwrite_tbl" |
| self.client.execute("""create table {0} (i int) |
| partitioned by spec (truncate(3, i)) |
| stored as iceberg""".format(tbl_name)) |
| self.client.execute("insert into {0} values (1), (2), (3);".format(tbl_name)) |
| |
| # Test queries: 'a' is the long running query while 'b' and 'c' are the short ones |
| query_a = """insert overwrite {0} select sleep(5000);""".format(tbl_name) |
| query_b = """insert overwrite {0} select * from {0};""".format(tbl_name) |
| query_c = """insert into {0} select * from {0};""".format(tbl_name) |
| |
| # Test concurrent INSERT OVERWRITEs, the exception closes the query handle. |
| handle = self.client.execute_async(query_a) |
| time.sleep(1) |
| self.client.execute(query_b) |
| try: |
| self.client.wait_for_finished_timeout(handle, 30) |
| assert False |
| except IMPALA_CONNECTION_EXCEPTION as e: |
| assert "Found conflicting files" in str(e) |
| |
| # Test INSERT INTO during INSERT OVERWRITE, the exception closes the query handle. |
| handle = self.client.execute_async(query_a) |
| time.sleep(1) |
| self.client.execute(query_c) |
| try: |
| self.client.wait_for_finished_timeout(handle, 30) |
| assert False |
| except IMPALA_CONNECTION_EXCEPTION as e: |
| assert "Found conflicting files" in str(e) |
| |
| def test_ctas(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-ctas', vector, use_db=unique_database) |
| |
| def test_partition_transform_insert(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-partition-transform-insert', vector, |
| use_db=unique_database) |
| |
| def test_iceberg_orc_field_id(self, vector): |
| self.run_test_case('QueryTest/iceberg-orc-field-id', vector) |
| |
| def test_catalogs(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-catalogs', vector, use_db=unique_database) |
| |
| def test_missing_field_ids(self, vector): |
| self.run_test_case('QueryTest/iceberg-missing-field-ids', vector) |
| |
| def test_migrated_tables(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-migrated-tables', vector, unique_database) |
| |
| def test_migrated_table_field_id_resolution(self, vector, unique_database): |
| create_iceberg_table_from_directory(self.client, unique_database, |
| "iceberg_migrated_alter_test", "parquet") |
| create_iceberg_table_from_directory(self.client, unique_database, |
| "iceberg_migrated_complex_test", "parquet") |
| create_iceberg_table_from_directory(self.client, unique_database, |
| "iceberg_migrated_alter_test_orc", "orc") |
| create_iceberg_table_from_directory(self.client, unique_database, |
| "iceberg_migrated_complex_test_orc", "orc") |
| self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution', |
| vector, unique_database) |
| if IS_HDFS: |
| self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution-orc', |
| vector, unique_database) |
| |
| def test_column_case_sensitivity(self, vector, unique_database): |
| create_iceberg_table_from_directory(self.client, unique_database, |
| "iceberg_column_case_sensitivity_issue", "parquet") |
| self.run_test_case('QueryTest/iceberg-column-case-sensitivity-issue', |
| vector, unique_database) |
| |
| @SkipIfFS.hive |
| def test_migrated_table_field_id_resolution_complex(self, vector, unique_database): |
| def get_table_loc(tbl_name): |
| return '%s/%s.db/%s/' % (WAREHOUSE, unique_database, tbl_name) |
| |
| def create_table(tbl_name, file_format, partition_cols): |
| self.execute_query("""CREATE TABLE IF NOT EXISTS {}.{} ( |
| id INT, |
| name STRING, |
| teststeps array<struct<step_number:int,step_description:string>>) |
| PARTITIONED BY ({}) |
| STORED AS {} |
| """.format(unique_database, tbl_name, partition_cols, file_format)) |
| |
| def add_file_to_table_partition(tbl_name, part_dir, local_filename): |
| tbl_loc = get_table_loc(tbl_name) |
| part_dir = os.path.join(tbl_loc, part_dir) |
| self.filesystem_client.make_dir(part_dir) |
| data_file_path = os.path.join(os.environ['IMPALA_HOME'], "testdata", |
| "migrated_iceberg", local_filename) |
| self.filesystem_client.copy_from_local(data_file_path, part_dir) |
| |
| def finalize_table(tbl_name): |
| self.execute_query("ALTER TABLE {}.{} RECOVER PARTITIONS".format( |
| unique_database, tbl_name)) |
| self.execute_query("ALTER TABLE {}.{} CONVERT TO ICEBERG".format( |
| unique_database, tbl_name)) |
| |
| def prepare_test_table(tbl_name, file_format, partition_cols, part_dir, datafile): |
| create_table(tbl_name, file_format, partition_cols) |
| add_file_to_table_partition(tbl_name, part_dir, datafile) |
| finalize_table(tbl_name) |
| |
| prepare_test_table('all_part_cols_stored_parquet', |
| "PARQUET", |
| "result_date STRING", |
| "result_date=2024-08-26", |
| "complextypes_and_partition_columns_in_data_files.parquet") |
| prepare_test_table('not_all_part_cols_stored_parquet', |
| "PARQUET", |
| "result_date STRING, p INT", |
| "result_date=2024-08-26/p=3", |
| "complextypes_and_partition_columns_in_data_files.parquet") |
| prepare_test_table('all_part_cols_stored_orc', |
| "ORC", |
| "result_date STRING", |
| "result_date=2024-08-26", |
| "complextypes_and_partition_columns_in_data_files.orc") |
| prepare_test_table('not_all_part_cols_stored_orc', |
| "ORC", |
| "result_date STRING, p INT", |
| "result_date=2024-08-26/p=3", |
| "complextypes_and_partition_columns_in_data_files.orc") |
| |
| self.run_test_case('QueryTest/iceberg-migrated-table-field-id-resolution-complex', |
| vector, unique_database) |
| |
| def test_describe_history(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-table-history', vector, use_db=unique_database) |
| |
| # Create a table with multiple snapshots and verify the table history. |
| tbl_name = unique_database + ".iceberg_multi_snapshots" |
| self.client.execute("""create table {0} (i int) stored as iceberg |
| tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name)) |
| self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name)) |
| self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name)) |
| snapshots = get_snapshots(self.client, tbl_name, expected_result_size=2) |
| first_snapshot = snapshots[0] |
| second_snapshot = snapshots[1] |
| # Check that first snapshot is older than the second snapshot. |
| assert(first_snapshot.get_creation_time() < second_snapshot.get_creation_time()) |
| # Check that second snapshot's parent ID is the snapshot ID of the first snapshot. |
| assert(first_snapshot.get_snapshot_id() == second_snapshot.get_parent_id()) |
| # The first snapshot has no parent snapshot ID. |
| assert(first_snapshot.get_parent_id() is None) |
| # Check "is_current_ancestor" column. |
| assert(first_snapshot.is_current_ancestor()) |
| assert(second_snapshot.is_current_ancestor()) |
| |
| def test_execute_rollback_negative(self, vector): |
| """Negative test for EXECUTE ROLLBACK.""" |
| self.run_test_case('QueryTest/iceberg-rollback-negative', vector) |
| |
| def test_execute_rollback(self, unique_database): |
| """Test for EXECUTE ROLLBACK.""" |
| iceberg_catalogs = IcebergCatalogs(unique_database) |
| for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties(): |
| # Create a table with multiple snapshots. |
| tbl_name = unique_database + ".iceberg_execute_rollback" |
| # We are setting the TIMEZONE query option in this test, so let's create a local |
| # impala client. |
| with self.create_impala_client() as impalad_client: |
| orig_timezone = 'America/Los_Angeles' |
| impalad_client.execute("SET TIMEZONE='" + orig_timezone + "'") |
| impalad_client.execute(""" |
| create table {0} (i int) stored as iceberg |
| TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties)) |
| initial_snapshots = 3 |
| for i in range(initial_snapshots): |
| impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, i)) |
| snapshots = get_snapshots(impalad_client, tbl_name, |
| expected_result_size=initial_snapshots) |
| |
| output = self.rollback_to_id(tbl_name, snapshots[1].get_snapshot_id()) |
| LOG.info("success output={0}".format(output)) |
| |
| # We rolled back, but that creates a new snapshot, so now there are 4. |
| snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=4) |
| # The new snapshot has the same id (and parent id) as the snapshot we rolled back |
| # to, but it has a different creation time. |
| assert snapshots[1].get_snapshot_id() == snapshots[3].get_snapshot_id() |
| assert snapshots[1].get_parent_id() == snapshots[3].get_parent_id() |
| assert snapshots[1].get_creation_time() < snapshots[3].get_creation_time() |
| # The "orphaned" snapshot is now not a current ancestor. |
| assert not snapshots[2].is_current_ancestor() |
| |
| # We cannot roll back to a snapshot that is not a current ancestor. |
| output = self.rollback_to_id_expect_failure(tbl_name, |
| snapshots[2].get_snapshot_id(), |
| expected_text="Cannot roll back to snapshot, not an ancestor of the current " |
| "state") |
| |
| # Create another snapshot. |
| before_insert = datetime.datetime.now(pytz.timezone(orig_timezone)) |
| impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4)) |
| snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5) |
| |
| # Rollback to before the last insert. |
| self.rollback_to_ts(impalad_client, tbl_name, before_insert) |
| # This creates another snapshot. |
| snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=6) |
| # The snapshot id is the same, the dates differ |
| assert snapshots[3].get_snapshot_id() == snapshots[5].get_snapshot_id() |
| assert snapshots[3].get_creation_time() < snapshots[5].get_creation_time() |
| assert not snapshots[4].is_current_ancestor() |
| |
| # Show that the EXECUTE ROLLBACK is respecting the current timezone. |
| # To do this we try to roll back to a time for which there is no |
| # snapshot, this will fail with an error message that includes the specified |
| # time. We parse out that time. By doing this in two timezones we can see |
| # that the parameter being used was affected by the current timezone. |
| one_hour_ago = before_insert - datetime.timedelta(hours=1) |
| # We use Timezones from Japan and Iceland to avoid any DST complexities. |
| impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") |
| japan_ts = self.get_snapshot_ts_from_failed_rollback( |
| impalad_client, tbl_name, one_hour_ago) |
| impalad_client.execute("SET TIMEZONE='Iceland'") |
| iceland_ts = self.get_snapshot_ts_from_failed_rollback( |
| impalad_client, tbl_name, one_hour_ago) |
| diff_hours = (iceland_ts - japan_ts) / (1000 * 60 * 60) |
| assert diff_hours == 9 |
| |
| impalad_client.execute("DROP TABLE {0}".format(tbl_name)) |
| |
| def get_snapshot_ts_from_failed_rollback(self, client, tbl_name, ts): |
| """Run an EXECUTE ROLLBACK which is expected to fail. |
| Parse the error message to extract the timestamp for which there |
| was no snapshot, and convert the string to an integer""" |
| try: |
| self.rollback_to_ts(client, tbl_name, ts) |
| assert False, "Query should have failed" |
| except IMPALA_CONNECTION_EXCEPTION as e: |
| result = re.search(r".*no valid snapshot older than: (\d+)", str(e)) |
| time_str = result.group(1) |
| snapshot_ts = int(time_str) |
| assert snapshot_ts > 0, "did not decode snapshot ts from {0}".format(result) |
| return snapshot_ts |
| |
| def rollback_to_ts(self, client, tbl_name, ts): |
| """Rollback a table to a snapshot timestamp.""" |
| query = "ALTER TABLE {0} EXECUTE ROLLBACK ('{1}');".format(tbl_name, ts.isoformat()) |
| return self.execute_query_expect_success(client, query) |
| |
| def rollback_to_id(self, tbl_name, id): |
| """Rollback a table to a snapshot id.""" |
| query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id) |
| return self.execute_query_expect_success(self.client, query) |
| |
| def rollback_to_id_expect_failure(self, tbl_name, id, expected_text=None): |
| """Attempt to roll back a table to a snapshot id, expecting a failure.""" |
| query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id) |
| output = self.execute_query_expect_failure(self.client, query) |
| if expected_text: |
| assert expected_text in str(output) |
| return output |
| |
| def test_execute_remove_orphan_files(self, unique_database): |
| tbl_name = 'tbl_with_orphan_files' |
| db_tbl = unique_database + "." + tbl_name |
| with self.create_impala_client() as impalad_client: |
| impalad_client.execute("create table {0} (i int) stored as iceberg" |
| .format(db_tbl)) |
| insert_q = "insert into {0} values ({1})" |
| self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1)) |
| self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2)) |
| self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3)) |
| result = impalad_client.execute('select i from {} order by i'.format(db_tbl)) |
| assert result.data == ['1', '2', '3'] |
| |
| # Add some junk files to data and metadata dir. |
| TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name) |
| DATA_PATH = os.path.join(TABLE_PATH, "data") |
| METADATA_PATH = os.path.join(TABLE_PATH, "metadata") |
| SRC_DIR = os.path.join( |
| os.environ['IMPALA_HOME'], |
| "testdata/data/iceberg_test/iceberg_mixed_file_format_test/{0}/{1}") |
| |
| # Copy first set of junk files. |
| file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \ |
| "fff150b6136a-job_16619542960420_0002-1-00001.parquet" |
| file_avro1 = "055baf62-de6d-4583-bf21-f187f9482343-m0.avro" |
| self.filesystem_client.copy_from_local( |
| SRC_DIR.format('data', file_parq1), DATA_PATH) |
| self.filesystem_client.copy_from_local( |
| SRC_DIR.format('metadata', file_avro1), METADATA_PATH) |
| assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq1)) |
| assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro1)) |
| # Keep current time. |
| result = impalad_client.execute('select cast(now() as string)') |
| cp1_time = result.data[0] |
| time.sleep(1) |
| |
| # Copy second set of junk files. |
| file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \ |
| "27ff880faff0-job_16619542960420_0004-1-00001.parquet" |
| file_avro2 = "871d1473-8566-46c0-a530-a2256b3f396f-m0.avro" |
| self.filesystem_client.copy_from_local( |
| SRC_DIR.format('data', file_parq2), DATA_PATH) |
| self.filesystem_client.copy_from_local( |
| SRC_DIR.format('metadata', file_avro2), METADATA_PATH) |
| assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2)) |
| assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro2)) |
| |
| # Execute REMOVE_ORPHAN_FILES at specific timestamp. |
| result = impalad_client.execute( |
| "ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES('{1}')".format(db_tbl, cp1_time)) |
| assert result.data[0] == 'Remove orphan files executed.' |
| assert not self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq1)) |
| assert not self.filesystem_client.exists(os.path.join(METADATA_PATH, file_parq1)) |
| assert self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2)) |
| assert self.filesystem_client.exists(os.path.join(METADATA_PATH, file_avro2)) |
| |
| # Execute REMOVE_ORPHAN_FILES at now(). |
| result = impalad_client.execute( |
| "ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES(now())".format(db_tbl)) |
| assert result.data[0] == 'Remove orphan files executed.' |
| assert not self.filesystem_client.exists(os.path.join(DATA_PATH, file_parq2)) |
| assert not self.filesystem_client.exists(os.path.join(METADATA_PATH, file_parq2)) |
| |
| # Assert table still queryable. |
| result = impalad_client.execute('select i from {} order by i'.format(db_tbl)) |
| assert result.data == ['1', '2', '3'] |
| |
| def test_describe_history_params(self, unique_database): |
| tbl_name = unique_database + ".describe_history" |
| |
| # We are setting the TIMEZONE query option in this test, so let's create a local |
| # impala client. |
| with self.create_impala_client() as impalad_client: |
| # Iceberg doesn't create a snapshot entry for the initial empty table |
| impalad_client.execute("create table {0} (i int) stored as iceberg" |
| .format(tbl_name)) |
| insert_q = "insert into {0} values (1)".format(tbl_name) |
| ts_1 = self.execute_query_ts(impalad_client, insert_q) |
| time.sleep(1) |
| ts_2 = self.execute_query_ts(impalad_client, insert_q) |
| time.sleep(1) |
| ts_3 = self.execute_query_ts(impalad_client, insert_q) |
| # Describe history without predicate |
| data = impalad_client.execute("DESCRIBE HISTORY {0}".format(tbl_name)) |
| assert len(data.data) == 3 |
| |
| # Describe history with FROM predicate |
| self.expect_num_snapshots_from(impalad_client, tbl_name, |
| ts_1 - datetime.timedelta(hours=1), 3) |
| self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 2) |
| self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0) |
| |
| # Describe history with BETWEEN <ts> AND <ts> predicate |
| self.expect_results_between(impalad_client, tbl_name, ts_1, ts_2, 1) |
| self.expect_results_between(impalad_client, tbl_name, |
| ts_1 - datetime.timedelta(hours=1), ts_2, 2) |
| self.expect_results_between(impalad_client, tbl_name, |
| ts_1 - datetime.timedelta(hours=1), ts_2 + datetime.timedelta(hours=1), 3) |
| |
| # Check that timezone is interpreted in local timezone controlled by query option |
| # TIMEZONE. Persist the local times first and create a new snapshot. |
| impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") |
| now_tokyo = self.impala_now(impalad_client) |
| impalad_client.execute("SET TIMEZONE='Europe/Budapest'") |
| now_budapest = self.impala_now(impalad_client) |
| self.execute_query_ts(impalad_client, "insert into {0} values (4)".format(tbl_name)) |
| self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 1) |
| |
| # Let's switch to Tokyo time. Tokyo time is always greater than Budapest time. |
| impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") |
| self.expect_num_snapshots_from(impalad_client, tbl_name, now_tokyo, 1) |
| |
| # Interpreting Budapest time in Tokyo time points to the past. |
| self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 4) |
| |
| def test_time_travel(self, unique_database): |
| tbl_name = unique_database + ".time_travel" |
| |
| def expect_results(query, expected_results, expected_cols): |
| data = impalad_client.execute(query) |
| assert len(data.data) == len(expected_results) |
| for r in expected_results: |
| assert r in data.data |
| expected_col_labels = expected_cols['labels'] |
| expected_col_types = expected_cols['types'] |
| assert data.column_labels == expected_col_labels |
| assert data.column_types == expected_col_types |
| |
| def expect_for_count_star(query, expected): |
| data = impalad_client.execute(query) |
| assert len(data.data) == 1 |
| assert expected in data.data |
| assert "NumRowGroups" not in data.runtime_profile |
| assert "NumFileMetadataRead" not in data.runtime_profile |
| |
| def expect_results_t(ts, expected_results, expected_cols): |
| expect_results( |
| "select * from {0} for system_time as of {1}".format(tbl_name, ts), |
| expected_results, expected_cols) |
| |
| def expect_for_count_star_t(ts, expected): |
| expect_for_count_star( |
| "select count(*) from {0} for system_time as of {1}".format(tbl_name, ts), |
| expected) |
| |
| def expect_snapshot_id_in_plan_t(ts, snapshot_id): |
| data = impalad_client.execute( |
| "explain select * from {0} for system_time as of {1}".format( |
| tbl_name, ts)) |
| assert " Iceberg snapshot id: {0}".format(snapshot_id) in data.data |
| |
| def expect_results_v(snapshot_id, expected_results, expected_cols): |
| expect_results( |
| "select * from {0} for system_version as of {1}".format(tbl_name, snapshot_id), |
| expected_results, expected_cols) |
| |
| def expect_for_count_star_v(snapshot_id, expected): |
| expect_for_count_star( |
| "select count(*) from {0} for system_version as of {1}".format( |
| tbl_name, snapshot_id), |
| expected) |
| |
| def expect_snapshot_id_in_plan_v(snapshot_id): |
| data = impalad_client.execute( |
| "explain select * from {0} for system_version as of {1}".format( |
| tbl_name, snapshot_id)) |
| assert " Iceberg snapshot id: {0}".format(snapshot_id) in data.data |
| |
| def impala_now(): |
| now_data = impalad_client.execute("select now()") |
| return now_data.data[0] |
| |
| # We are setting the TIMEZONE query option in this test, so let's create a local |
| # impala client. |
| with self.create_impala_client() as impalad_client: |
| # Iceberg doesn't create a snapshot entry for the initial empty table |
| impalad_client.execute("create table {0} (i int) stored as iceberg" |
| .format(tbl_name)) |
| ts_1 = self.execute_query_ts(impalad_client, "insert into {0} values (1)" |
| .format(tbl_name)) |
| ts_2 = self.execute_query_ts(impalad_client, "insert into {0} values (2)" |
| .format(tbl_name)) |
| ts_3 = self.execute_query_ts(impalad_client, "truncate table {0}".format(tbl_name)) |
| time.sleep(5) |
| ts_4 = self.execute_query_ts(impalad_client, "insert into {0} values (100)" |
| .format(tbl_name)) |
| ts_no_ss = self.execute_query_ts(impalad_client, |
| "alter table {0} add column {1} bigint" |
| .format(tbl_name, "j")) |
| ts_5 = self.execute_query_ts(impalad_client, "insert into {0} (i,j) values (3, 103)" |
| .format(tbl_name)) |
| |
| # Descriptions of the different schemas we expect to see as Time Travel queries |
| # use the schema from the specified time or snapshot. |
| # |
| # When the schema is just the 'J' column. |
| j_cols = { |
| 'labels': ['J'], |
| 'types': ['BIGINT'] |
| } |
| # When the schema is just the 'I' column. |
| i_cols = { |
| 'labels': ['I'], |
| 'types': ['INT'] |
| } |
| # When the schema is the 'I' and 'J' columns. |
| ij_cols = { |
| 'labels': ['I', 'J'], |
| 'types': ['INT', 'BIGINT'] |
| } |
| |
| # Query table as of timestamps. |
| expect_results_t("now()", ['100\tNULL', '3\t103'], ij_cols) |
| expect_results_t(quote(ts_1), ['1'], i_cols) |
| expect_results_t(quote(ts_2), ['1', '2'], i_cols) |
| expect_results_t(quote(ts_3), [], i_cols) |
| expect_results_t(cast_ts(ts_3) + " + interval 1 seconds", [], i_cols) |
| expect_results_t(quote(ts_4), ['100'], i_cols) |
| expect_results_t(cast_ts(ts_4) + " - interval 5 seconds", [], i_cols) |
| # There is no new snapshot created by the schema change between ts_4 and ts_no_ss. |
| # So at ts_no_ss we see the schema as of ts_4 |
| expect_results_t(quote(ts_no_ss), ['100'], i_cols) |
| expect_results_t(quote(ts_5), ['100\tNULL', '3\t103'], ij_cols) |
| # Future queries return the current snapshot. |
| expect_results_t(cast_ts(ts_5) + " + interval 1 hours", ['100\tNULL', '3\t103'], |
| ij_cols) |
| |
| # Query table as of snapshot IDs. |
| snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5) |
| expect_results_v(snapshots[0].get_snapshot_id(), ['1'], i_cols) |
| expect_results_v(snapshots[1].get_snapshot_id(), ['1', '2'], i_cols) |
| expect_results_v(snapshots[2].get_snapshot_id(), [], i_cols) |
| expect_results_v(snapshots[3].get_snapshot_id(), ['100'], i_cols) |
| expect_results_v(snapshots[4].get_snapshot_id(), ['100\tNULL', '3\t103'], ij_cols) |
| |
| expect_snapshot_id_in_plan_v(snapshots[0].get_snapshot_id()) |
| expect_snapshot_id_in_plan_v(snapshots[1].get_snapshot_id()) |
| expect_snapshot_id_in_plan_v(snapshots[2].get_snapshot_id()) |
| expect_snapshot_id_in_plan_v(snapshots[3].get_snapshot_id()) |
| expect_snapshot_id_in_plan_v(snapshots[4].get_snapshot_id()) |
| |
| expect_snapshot_id_in_plan_t(quote(ts_1), snapshots[0].get_snapshot_id()) |
| expect_snapshot_id_in_plan_t(quote(ts_2), snapshots[1].get_snapshot_id()) |
| expect_snapshot_id_in_plan_t(quote(ts_3), snapshots[2].get_snapshot_id()) |
| expect_snapshot_id_in_plan_t(quote(ts_4), snapshots[3].get_snapshot_id()) |
| expect_snapshot_id_in_plan_t(quote(ts_5), snapshots[4].get_snapshot_id()) |
| |
| # Test of plain count star optimization |
| # 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile |
| expect_for_count_star_t("now()", '2') |
| expect_for_count_star_t(quote(ts_1), '1') |
| expect_for_count_star_t(quote(ts_2), '2') |
| expect_for_count_star_t(quote(ts_3), '0') |
| expect_for_count_star_t(cast_ts(ts_3) + " + interval 1 seconds", '0') |
| expect_for_count_star_t(quote(ts_4), '1') |
| expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0') |
| expect_for_count_star_t(cast_ts(ts_5), '2') |
| expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2') |
| expect_for_count_star_v(snapshots[0].get_snapshot_id(), '1') |
| expect_for_count_star_v(snapshots[1].get_snapshot_id(), '2') |
| expect_for_count_star_v(snapshots[2].get_snapshot_id(), '0') |
| expect_for_count_star_v(snapshots[3].get_snapshot_id(), '1') |
| expect_for_count_star_v(snapshots[4].get_snapshot_id(), '2') |
| |
| # SELECT diff |
| expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' |
| MINUS |
| SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format( |
| tbl=tbl_name, ts_new=ts_2, ts_old=ts_1), |
| ['2'], i_cols) |
| expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new} |
| MINUS |
| SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format( |
| tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), |
| v_old=snapshots[0].get_snapshot_id()), |
| ['2'], i_cols) |
| # Mix SYSTEM_TIME and SYSTEM_VERSION |
| expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new} |
| MINUS |
| SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format( |
| tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), ts_old=ts_1), |
| ['2'], i_cols) |
| expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' |
| MINUS |
| SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format( |
| tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0].get_snapshot_id()), |
| ['2'], i_cols) |
| expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' |
| MINUS |
| SELECT *, NULL FROM {tbl} FOR SYSTEM_TIME |
| AS OF '{ts_old}'""".format( |
| tbl=tbl_name, ts_new=ts_5, ts_old=ts_4), |
| ['3\t103'], ij_cols) |
| |
| # Query old snapshot |
| try: |
| impalad_client.execute("SELECT * FROM {0} FOR SYSTEM_TIME AS OF {1}".format( |
| tbl_name, "now() - interval 2 years")) |
| assert False # Exception must be thrown |
| except Exception as e: |
| assert "Cannot find a snapshot older than" in str(e) |
| # Query invalid snapshot |
| try: |
| impalad_client.execute("SELECT * FROM {0} FOR SYSTEM_VERSION AS OF 42".format( |
| tbl_name)) |
| assert False # Exception must be thrown |
| except Exception as e: |
| assert "Cannot find snapshot with ID 42" in str(e) |
| |
| # Go back to one column |
| impalad_client.execute("alter table {0} drop column i".format(tbl_name)) |
| |
| # Test that deleted column is not selectable. |
| try: |
| impalad_client.execute("SELECT i FROM {0}".format(tbl_name)) |
| assert False # Exception must be thrown |
| except Exception as e: |
| assert "Could not resolve column/field reference: 'i'" in str(e) |
| |
| # Back at ts_2 the deleted 'I' column is there |
| expect_results("SELECT * FROM {0} FOR SYSTEM_TIME AS OF '{1}'". |
| format(tbl_name, ts_2), ['1', '2'], i_cols) |
| expect_results("SELECT i FROM {0} FOR SYSTEM_TIME AS OF '{1}'". |
| format(tbl_name, ts_2), ['1', '2'], i_cols) |
| |
| # Check that timezone is interpreted in local timezone controlled by query option |
| # TIMEZONE |
| impalad_client.execute("truncate table {0}".format(tbl_name)) |
| impalad_client.execute("insert into {0} values (1111)".format(tbl_name)) |
| impalad_client.execute("SET TIMEZONE='Europe/Budapest'") |
| now_budapest = impala_now() |
| expect_results_t(quote(now_budapest), ['1111'], j_cols) |
| |
| # Let's switch to Tokyo time. Tokyo time is always greater than Budapest time. |
| impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") |
| now_tokyo = impala_now() |
| expect_results_t(quote(now_tokyo), ['1111'], j_cols) |
| try: |
| # Interpreting Budapest time in Tokyo time points to the past when the table |
| # didn't exist. |
| expect_results_t(quote(now_budapest), [], j_cols) |
| assert False |
| except Exception as e: |
| assert "Cannot find a snapshot older than" in str(e) |
| |
| def test_time_travel_queries(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-time-travel', vector, use_db=unique_database) |
| |
| @SkipIf.not_dfs |
| def test_strings_utf8(self, unique_database): |
| # Create table |
| table_name = "ice_str_utf8" |
| qualified_table_name = "%s.%s" % (unique_database, table_name) |
| query = 'create table %s (a string) stored as iceberg' % qualified_table_name |
| self.client.execute(query) |
| |
| # Inserted string data should have UTF8 annotation regardless of query options. |
| query = 'insert into %s values ("impala")' % qualified_table_name |
| self.execute_query(query, {'parquet_annotate_strings_utf8': False}) |
| |
| # Copy the created file to the local filesystem and parse metadata |
| local_file = '/tmp/iceberg_utf8_test_%s.parq' % random.randint(0, 10000) |
| LOG.info("test_strings_utf8 local file name: " + local_file) |
| hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/data/*.parq' |
| % (unique_database, table_name)) |
| check_call(['hadoop', 'fs', '-copyToLocal', hdfs_file, local_file]) |
| metadata = get_parquet_metadata(local_file) |
| |
| # Extract SchemaElements corresponding to the table column |
| a_schema_element = metadata.schema[1] |
| assert a_schema_element.name == 'a' |
| |
| # Check that the schema uses the UTF8 annotation |
| assert a_schema_element.converted_type == ConvertedType.UTF8 |
| |
| os.remove(local_file) |
| |
| # Get hdfs path to manifest list that belongs to the snapshot identified by |
| # 'snapshot_counter'. |
| def get_manifest_list_hdfs_path(self, tmp_path_prefix, db_name, table_name, |
| snapshot_counter): |
| local_path = '%s_%s.metadata.json' % (tmp_path_prefix, random.randint(0, 10000)) |
| hdfs_path = get_fs_path('/test-warehouse/%s.db/%s/metadata/%s*.metadata.json' |
| % (db_name, table_name, snapshot_counter)) |
| check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path]) |
| |
| manifest_list_hdfs_path = None |
| try: |
| with open(local_path, 'r') as fp: |
| metadata = json.load(fp) |
| current_snapshot_id = metadata['current-snapshot-id'] |
| for snapshot in metadata['snapshots']: |
| if snapshot['snapshot-id'] == current_snapshot_id: |
| manifest_list_hdfs_path = snapshot['manifest-list'] |
| break |
| finally: |
| os.remove(local_path) |
| return manifest_list_hdfs_path |
| |
| # Get list of hdfs paths to manifest files from the manifest list avro file. |
| def get_manifest_hdfs_path_list(self, tmp_path_prefix, manifest_list_hdfs_path): |
| local_path = '%s_%s.manifest_list.avro' % (tmp_path_prefix, random.randint(0, 10000)) |
| check_call(['hadoop', 'fs', '-copyToLocal', manifest_list_hdfs_path, local_path]) |
| |
| manifest_hdfs_path_list = [] |
| reader = None |
| try: |
| with open(local_path, 'rb') as fp: |
| reader = DataFileReader(fp, DatumReader()) |
| for manifest in reader: |
| manifest_hdfs_path_list.append(manifest['manifest_path']) |
| finally: |
| if reader: |
| reader.close() |
| os.remove(local_path) |
| return manifest_hdfs_path_list |
| |
| # Get 'data_file' structs from avro manifest files. |
| def get_data_file_list(self, tmp_path_prefix, manifest_hdfs_path_list): |
| datafiles = [] |
| for hdfs_path in manifest_hdfs_path_list: |
| local_path = '%s_%s.manifest.avro' % (tmp_path_prefix, random.randint(0, 10000)) |
| check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path]) |
| |
| reader = None |
| try: |
| with open(local_path, 'rb') as fp: |
| reader = DataFileReader(fp, DatumReader()) |
| datafiles.extend([rec['data_file'] for rec in reader]) |
| finally: |
| if reader: |
| reader.close() |
| os.remove(local_path) |
| return datafiles |
| |
| def get_latest_metadata_path(self, database_name, table_name): |
| describe = 'describe extended %s.%s ' % (database_name, table_name) |
| output = self.client.execute(describe) |
| metadata_location = [s for s in output.data if s.startswith('\tmetadata_location')] |
| assert len(metadata_location) == 1 |
| metadata_location_split = metadata_location[0].split('\t') |
| assert len(metadata_location_split) == 3 |
| metadata_path = metadata_location_split[2] |
| return metadata_path |
| |
| # Get the current partition spec as JSON from the latest metadata file |
| def get_current_partition_spec(self, database_name, table_name): |
| hdfs_path = self.get_latest_metadata_path(database_name, table_name) |
| output = check_output(['hadoop', 'fs', '-cat', hdfs_path]) |
| |
| current_partition_spec = None |
| metadata = json.loads(output) |
| |
| current_spec_id = metadata['default-spec-id'] |
| for spec in metadata['partition-specs']: |
| if spec['spec-id'] == current_spec_id: |
| current_partition_spec = spec |
| break |
| return current_partition_spec |
| |
| @SkipIf.not_dfs |
| def test_partition_spec_update_v1(self, unique_database): |
| # Create table |
| table_name = "ice_part" |
| qualified_table_name = "%s.%s" % (unique_database, table_name) |
| create_table = """create table {} |
| (s string, i int) partitioned by spec(truncate(5, s), identity(i)) |
| stored as iceberg |
| tblproperties ('format-version'='1')""".format(qualified_table_name) |
| self.client.execute(create_table) |
| |
| partition_spec = self.get_current_partition_spec(unique_database, table_name) |
| assert len(partition_spec['fields']) == 2 |
| truncate_s = partition_spec['fields'][0] |
| identity_i = partition_spec['fields'][1] |
| # At table creation, partition names does not contain the parameter value. |
| assert truncate_s['name'] == 's_trunc' |
| assert identity_i['name'] == 'i' |
| assert truncate_s['field-id'] == 1000 |
| assert identity_i['field-id'] == 1001 |
| |
| # Partition evolution |
| partition_evolution = 'alter table %s set partition ' \ |
| 'spec(identity(i), truncate(6,s))' % qualified_table_name |
| self.client.execute(partition_evolution) |
| |
| # V1 partition evolution keeps the old, modified fields, but changes their |
| # transform to VOID. |
| evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name) |
| assert len(evolved_partition_spec['fields']) == 3 |
| old_truncate_s = evolved_partition_spec['fields'][0] |
| identity_i = evolved_partition_spec['fields'][1] |
| truncate_s = evolved_partition_spec['fields'][2] |
| assert old_truncate_s['name'] == 's_trunc' |
| assert identity_i['name'] == 'i' |
| # Modified field name contains the parameter value. |
| assert truncate_s['name'] == 's_trunc_6' |
| assert old_truncate_s['field-id'] == 1000 |
| assert identity_i['field-id'] == 1001 |
| # field-id increases for the modified field. |
| assert truncate_s['field-id'] == 1002 |
| assert old_truncate_s['transform'] == 'void' |
| |
| @SkipIf.not_dfs |
| def test_partition_spec_update_v2(self, unique_database): |
| # Create table |
| table_name = "ice_part" |
| qualified_table_name = "%s.%s" % (unique_database, table_name) |
| create_table = 'create table %s ' \ |
| '(s string, i int) partitioned by spec(truncate(5, s), identity(i)) ' \ |
| 'stored as iceberg tblproperties ("format-version" = "2")' \ |
| % qualified_table_name |
| self.client.execute(create_table) |
| |
| partition_spec = self.get_current_partition_spec(unique_database, table_name) |
| assert len(partition_spec['fields']) == 2 |
| truncate_s = partition_spec['fields'][0] |
| identity_i = partition_spec['fields'][1] |
| # At table creation, partition names does not contain the parameter value. |
| assert truncate_s['name'] == 's_trunc' |
| assert identity_i['name'] == 'i' |
| assert truncate_s['field-id'] == 1000 |
| assert identity_i['field-id'] == 1001 |
| |
| # Partition evolution for s |
| partition_evolution = 'alter table %s set partition ' \ |
| 'spec(identity(i), truncate(6,s))' % qualified_table_name |
| self.client.execute(partition_evolution) |
| |
| # V2 partition evolution updates the previous partitioning for |
| # the modified field. |
| evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name) |
| assert len(evolved_partition_spec['fields']) == 2 |
| identity_i = evolved_partition_spec['fields'][0] |
| truncate_s = evolved_partition_spec['fields'][1] |
| assert identity_i['name'] == 'i' |
| # Modified field name contains the parameter value. |
| assert truncate_s['name'] == 's_trunc_6' |
| assert identity_i['field-id'] == 1001 |
| # field-id increased for the modified field. |
| assert truncate_s['field-id'] == 1002 |
| |
| # Partition evolution for i and s |
| partition_evolution = 'alter table %s set partition ' \ |
| 'spec(bucket(4, i), truncate(3,s))' \ |
| % qualified_table_name |
| self.client.execute(partition_evolution) |
| |
| evolved_partition_spec = self.get_current_partition_spec(unique_database, table_name) |
| assert len(evolved_partition_spec['fields']) == 2 |
| bucket_i = evolved_partition_spec['fields'][0] |
| truncate_s = evolved_partition_spec['fields'][1] |
| # The field naming follows the transform changes. |
| assert bucket_i['name'] == 'i_bucket_4' |
| assert truncate_s['name'] == 's_trunc_3' |
| # field-id's are increasing with each modification |
| assert bucket_i['field-id'] == 1003 |
| assert truncate_s['field-id'] == 1004 |
| |
| @SkipIf.not_dfs |
| def test_writing_metrics_to_metadata_v1(self, unique_database): |
| self._test_writing_metrics_to_metadata_impl(unique_database, 'ice_stats_v1', '1') |
| |
| @SkipIf.not_dfs |
| def test_writing_metrics_to_metadata_v2(self, unique_database): |
| self._test_writing_metrics_to_metadata_impl(unique_database, 'ice_stats_v2', '2') |
| |
| def _test_writing_metrics_to_metadata_impl(self, unique_database, table_name, version): |
| # Create table |
| qualified_table_name = "%s.%s" % (unique_database, table_name) |
| query = """create table {} |
| (s string, i int, b boolean, bi bigint, ts timestamp, dt date, |
| dc decimal(10, 3)) |
| stored as iceberg |
| tblproperties ('format-version'='{}')""".format(qualified_table_name, version) |
| self.client.execute(query) |
| # Insert data |
| # 1st data file: |
| query = 'insert into %s values ' \ |
| '("abc", 3, true, NULL, "1970-01-03 09:11:22", NULL, 56.34), ' \ |
| '("def", NULL, false, NULL, "1969-12-29 14:45:59", DATE"1969-01-01", -10.0), ' \ |
| '("ghij", 1, NULL, 123456789000000, "1970-01-01", DATE"1970-12-31", NULL), ' \ |
| '(NULL, 0, NULL, 234567890000001, NULL, DATE"1971-01-01", NULL)' \ |
| % qualified_table_name |
| self.execute_query(query) |
| # 2nd data file: |
| query = 'insert into %s values ' \ |
| '(NULL, NULL, NULL, NULL, NULL, NULL, NULL), ' \ |
| '(NULL, NULL, NULL, NULL, NULL, NULL, NULL)' \ |
| % qualified_table_name |
| self.execute_query(query) |
| |
| # Get hdfs path to manifest list file |
| manifest_list_hdfs_path = self.get_manifest_list_hdfs_path( |
| '/tmp/iceberg_metrics_test', unique_database, table_name, '00002') |
| |
| # Get the list of hdfs paths to manifest files |
| assert manifest_list_hdfs_path is not None |
| manifest_hdfs_path_list = self.get_manifest_hdfs_path_list( |
| '/tmp/iceberg_metrics_test', manifest_list_hdfs_path) |
| |
| # Get 'data_file' records from manifest files. |
| assert manifest_hdfs_path_list is not None and len(manifest_hdfs_path_list) > 0 |
| datafiles = self.get_data_file_list('/tmp/iceberg_metrics_test', |
| manifest_hdfs_path_list) |
| |
| # Check column stats in datafiles |
| assert datafiles is not None and len(datafiles) == 2 |
| |
| # The 1st datafile contains the 2 NULL rows |
| assert datafiles[0]['record_count'] == 2 |
| assert datafiles[0]['column_sizes'] == \ |
| [{'key': 1, 'value': 39}, |
| {'key': 2, 'value': 39}, |
| {'key': 3, 'value': 25}, |
| {'key': 4, 'value': 39}, |
| {'key': 5, 'value': 39}, |
| {'key': 6, 'value': 39}, |
| {'key': 7, 'value': 39}] |
| assert datafiles[0]['value_counts'] == \ |
| [{'key': 1, 'value': 2}, |
| {'key': 2, 'value': 2}, |
| {'key': 3, 'value': 2}, |
| {'key': 4, 'value': 2}, |
| {'key': 5, 'value': 2}, |
| {'key': 6, 'value': 2}, |
| {'key': 7, 'value': 2}] |
| assert datafiles[0]['null_value_counts'] == \ |
| [{'key': 1, 'value': 2}, |
| {'key': 2, 'value': 2}, |
| {'key': 3, 'value': 2}, |
| {'key': 4, 'value': 2}, |
| {'key': 5, 'value': 2}, |
| {'key': 6, 'value': 2}, |
| {'key': 7, 'value': 2}] |
| # Upper/lower bounds should be empty lists |
| assert datafiles[0]['lower_bounds'] == [] |
| assert datafiles[0]['upper_bounds'] == [] |
| |
| # 2nd datafile |
| assert datafiles[1]['record_count'] == 4 |
| assert datafiles[1]['column_sizes'] == \ |
| [{'key': 1, 'value': 66}, |
| {'key': 2, 'value': 56}, |
| {'key': 3, 'value': 26}, |
| {'key': 4, 'value': 59}, |
| {'key': 5, 'value': 68}, |
| {'key': 6, 'value': 56}, |
| {'key': 7, 'value': 53}] |
| assert datafiles[1]['value_counts'] == \ |
| [{'key': 1, 'value': 4}, |
| {'key': 2, 'value': 4}, |
| {'key': 3, 'value': 4}, |
| {'key': 4, 'value': 4}, |
| {'key': 5, 'value': 4}, |
| {'key': 6, 'value': 4}, |
| {'key': 7, 'value': 4}] |
| assert datafiles[1]['null_value_counts'] == \ |
| [{'key': 1, 'value': 1}, |
| {'key': 2, 'value': 1}, |
| {'key': 3, 'value': 2}, |
| {'key': 4, 'value': 2}, |
| {'key': 5, 'value': 1}, |
| {'key': 6, 'value': 1}, |
| {'key': 7, 'value': 2}] |
| assert datafiles[1]['lower_bounds'] == \ |
| [{'key': 1, 'value': b'abc'}, |
| # INT is serialized as 4-byte little endian |
| {'key': 2, 'value': b'\x00\x00\x00\x00'}, |
| # BOOLEAN is serialized as 0x00 for FALSE |
| {'key': 3, 'value': b'\x00'}, |
| # BIGINT is serialized as 8-byte little endian |
| {'key': 4, 'value': b'\x40\xaf\x0d\x86\x48\x70\x00\x00'}, |
| # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since |
| # 1970-01-01 00:00:00) |
| {'key': 5, 'value': b'\xc0\xd7\xff\x06\xd0\xff\xff\xff'}, |
| # DATE is serialized as 4-byte little endian (number of days since 1970-01-01) |
| {'key': 6, 'value': b'\x93\xfe\xff\xff'}, |
| # Unlike other numerical values, DECIMAL is serialized as big-endian. |
| {'key': 7, 'value': b'\xd8\xf0'}] |
| assert datafiles[1]['upper_bounds'] == \ |
| [{'key': 1, 'value': b'ghij'}, |
| # INT is serialized as 4-byte little endian |
| {'key': 2, 'value': b'\x03\x00\x00\x00'}, |
| # BOOLEAN is serialized as 0x01 for TRUE |
| {'key': 3, 'value': b'\x01'}, |
| # BIGINT is serialized as 8-byte little endian |
| {'key': 4, 'value': b'\x81\x58\xc2\x97\x56\xd5\x00\x00'}, |
| # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since |
| # 1970-01-01 00:00:00) |
| {'key': 5, 'value': b'\x80\x02\x86\xef\x2f\x00\x00\x00'}, |
| # DATE is serialized as 4-byte little endian (number of days since 1970-01-01) |
| {'key': 6, 'value': b'\x6d\x01\x00\x00'}, |
| # Unlike other numerical values, DECIMAL is serialized as big-endian. |
| {'key': 7, 'value': b'\x00\xdc\x14'}] |
| |
| def test_using_upper_lower_bound_metrics(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-upper-lower-bound-metrics', vector, |
| use_db=unique_database) |
| |
| def test_writing_many_files(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-write-many-files', vector, |
| use_db=unique_database) |
| |
| @pytest.mark.execute_serially |
| def test_writing_many_files_stress(self, vector, unique_database): |
| if self.exploration_strategy() != 'exhaustive': |
| pytest.skip('runs only in exhaustive') |
| self.run_test_case('QueryTest/iceberg-write-many-files-stress', vector, |
| use_db=unique_database) |
| |
| @pytest.mark.execute_serially |
| def test_table_load_time_for_many_files(self, unique_database): |
| if self.exploration_strategy() != 'exhaustive': |
| pytest.skip('runs only in exhaustive') |
| tbl_name = unique_database + ".iceberg_many_files" |
| self.execute_query("""CREATE TABLE {} |
| PARTITIONED BY SPEC (bucket(2039, l_orderkey)) |
| STORED AS ICEBERG |
| AS SELECT * FROM tpch_parquet.lineitem""".format(tbl_name)) |
| self.execute_query("invalidate metadata") |
| start_time = time.time() |
| self.execute_query("describe formatted {}".format(tbl_name)) |
| elapsed_time = time.time() - start_time |
| if IS_HDFS: |
| time_limit = 10 |
| else: |
| time_limit = 20 |
| assert elapsed_time < time_limit |
| |
| def test_consistent_scheduling(self, unique_database): |
| """IMPALA-10914: This test verifies that Impala schedules scan ranges consistently for |
| Iceberg tables.""" |
| def collect_split_stats(profile): |
| splits = [s.strip() for s in profile.splitlines() if "Hdfs split stats" in s] |
| splits.sort() |
| return splits |
| |
| with self.create_impala_client() as impalad_client: |
| impalad_client.execute("use " + unique_database) |
| impalad_client.execute("""create table line_ice stored as iceberg |
| as select * from tpch_parquet.lineitem""") |
| first_result = impalad_client.execute("""select count(*) from line_ice""") |
| ref_profile = first_result.runtime_profile |
| ref_split_stats = collect_split_stats(ref_profile) |
| |
| for i in range(0, 10): |
| # Subsequent executions of the same query should schedule scan ranges similarly. |
| result = impalad_client.execute("""select count(*) from line_ice""") |
| profile = result.runtime_profile |
| split_stats = collect_split_stats(profile) |
| assert ref_split_stats == split_stats |
| |
| def test_scheduling_partitioned_tables(self, unique_database): |
| """IMPALA-12765: Balance consecutive partitions better for Iceberg tables""" |
| # We are setting the replica_preference query option in this test, so let's create a |
| # local impala client. |
| inventory_tbl = "inventory_ice" |
| item_tbl = "item_ice" |
| date_dim_tbl = "date_dim_ice" |
| with self.create_impala_client() as impalad_client: |
| impalad_client.execute("use " + unique_database) |
| impalad_client.execute("set replica_preference=remote") |
| impalad_client.execute(""" |
| CREATE TABLE {} |
| PARTITIONED BY SPEC (inv_date_sk) |
| STORED BY ICEBERG |
| AS SELECT * from tpcds_partitioned_parquet_snap.inventory; |
| """.format(inventory_tbl)) |
| impalad_client.execute(""" |
| CREATE TABLE {} |
| STORED BY ICEBERG |
| AS SELECT * from tpcds_partitioned_parquet_snap.item; |
| """.format(item_tbl)) |
| impalad_client.execute(""" |
| CREATE TABLE {} |
| STORED BY ICEBERG |
| AS SELECT * from tpcds_partitioned_parquet_snap.date_dim; |
| """.format(date_dim_tbl)) |
| q22_result = impalad_client.execute(""" |
| select i_product_name, i_brand, i_class, i_category, |
| avg(inv_quantity_on_hand) qoh |
| from inventory_ice, date_dim_ice, item_ice |
| where inv_date_sk=d_date_sk and |
| inv_item_sk=i_item_sk and |
| d_month_seq between 1199 and 1199 + 11 |
| group by rollup(i_product_name, i_brand, i_class, i_category) |
| order by qoh, i_product_name, i_brand, i_class, i_category |
| limit 100 |
| """) |
| profile = q22_result.runtime_profile |
| # "Files rejected:" contains the number of files being rejected by runtime |
| # filters. With IMPALA-12765 we should see similar numbers for each executor. |
| files_rejected_array = re.findall(r"Files rejected: \d+ \((\d+)\)", profile) |
| avg_files_rejected = int(files_rejected_array[0]) |
| THRESHOLD = 3 |
| for files_rejected_str in files_rejected_array: |
| files_rejected = int(files_rejected_str) |
| if files_rejected != 0: |
| assert abs(avg_files_rejected - files_rejected) < THRESHOLD |
| |
| def test_in_predicate_push_down(self, vector, unique_database): |
| self.execute_query("SET RUNTIME_FILTER_MODE=OFF") |
| self.run_test_case('QueryTest/iceberg-in-predicate-push-down', vector, |
| use_db=unique_database) |
| |
| def test_is_null_predicate_push_down(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-is-null-predicate-push-down', vector, |
| use_db=unique_database) |
| |
| def test_compound_predicate_push_down(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-compound-predicate-push-down', vector, |
| use_db=unique_database) |
| |
| def test_plain_count_star_optimization(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-plain-count-star-optimization', vector, |
| use_db=unique_database) |
| |
| def test_create_table_like_table(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-create-table-like-table', vector, |
| use_db=unique_database) |
| |
| def test_table_owner(self, vector, unique_database): |
| self.run_table_owner_test(vector, unique_database, "some_random_user") |
| self.run_table_owner_test(vector, unique_database, "another_random_user") |
| |
| def run_table_owner_test(self, vector, db_name, user_name): |
| # Create Iceberg table with a given user running the query. |
| tbl_name = "iceberg_table_owner" |
| sql_stmt = 'CREATE TABLE {0}.{1} (i int) STORED AS ICEBERG'.format( |
| db_name, tbl_name) |
| args = ['-u', user_name, '-q', sql_stmt] |
| run_impala_shell_cmd(vector, args) |
| |
| # Run DESCRIBE FORMATTED to get the owner of the table. |
| args = ['-q', 'DESCRIBE FORMATTED {0}.{1}'.format(db_name, tbl_name)] |
| results = run_impala_shell_cmd(vector, args) |
| result_rows = results.stdout.strip().split('\n') |
| |
| # Find the output row with the owner. |
| owner_row = "" |
| for row in result_rows: |
| if "Owner:" in row: |
| owner_row = row |
| assert owner_row != "", "DESCRIBE output doesn't contain owner" + results.stdout |
| # Verify that the user running the query is the owner of the table. |
| assert user_name in owner_row, "Unexpected owner of Iceberg table. " + \ |
| "Expected user name: {0}. Actual output row: {1}".format(user_name, owner_row) |
| |
| args = ['-q', 'DROP TABLE {0}.{1}'.format(db_name, tbl_name)] |
| results = run_impala_shell_cmd(vector, args) |
| |
| def test_mixed_file_format(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-mixed-file-format', vector, |
| unique_database) |
| |
| def test_load(self, vector, unique_database): |
| """Test LOAD DATA INPATH for Iceberg tables, the first part of this method inits the |
| target directory, copies existing test data to HDFS. The second part runs the test |
| cases then cleans up the test directory. |
| """ |
| # Test 1-6 init: target orc/parquet file and directory |
| SRC_DIR = os.path.join(os.environ['IMPALA_HOME'], |
| "testdata/data/iceberg_test/iceberg_mixed_file_format_test/data/{0}") |
| DST_DIR = "/tmp/" + unique_database + "/parquet/" |
| self.filesystem_client.make_dir(DST_DIR, permission=777) |
| file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \ |
| "fff150b6136a-job_16619542960420_0002-1-00001.parquet" |
| file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \ |
| "27ff880faff0-job_16619542960420_0004-1-00001.parquet" |
| self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR) |
| self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq2), DST_DIR) |
| DST_DIR = "/tmp/" + unique_database + "/orc/" |
| self.filesystem_client.make_dir(DST_DIR, permission=777) |
| file_orc1 = "00000-0-data-gfurnstahl_20220906113255_8d49367d-e338-4996-ade5-" \ |
| "ee500a19c1d1-job_16619542960420_0003-1-00001.orc" |
| file_orc2 = "00000-0-data-gfurnstahl_20220906114900_9c1b7b46-5643-428f-a007-" \ |
| "519c5500ed04-job_16619542960420_0004-1-00001.orc" |
| self.filesystem_client.copy_from_local(SRC_DIR.format(file_orc1), DST_DIR) |
| self.filesystem_client.copy_from_local(SRC_DIR.format(file_orc2), DST_DIR) |
| # Test 7 init: overwrite |
| DST_DIR = "/tmp/" + unique_database + "/overwrite/" |
| self.filesystem_client.make_dir(DST_DIR, permission=777) |
| self.filesystem_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR) |
| # Test 8 init: mismatching parquet schema format |
| SRC_DIR = os.path.join(os.environ['IMPALA_HOME'], "testdata/data/iceberg_test/" |
| "iceberg_partitioned/data/event_time_hour=2020-01-01-08/action=view/{0}") |
| DST_DIR = "/tmp/" + unique_database + "/mismatching_schema/" |
| self.filesystem_client.make_dir(DST_DIR, permission=777) |
| file = "00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet" |
| self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR) |
| # Test 9 init: partitioned |
| DST_DIR = "/tmp/" + unique_database + "/partitioned/" |
| self.filesystem_client.make_dir(DST_DIR, permission=777) |
| self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR) |
| # Test 10 init: hidden files |
| DST_DIR = "/tmp/" + unique_database + "/hidden/" |
| self.filesystem_client.make_dir(DST_DIR, permission=777) |
| self.filesystem_client.create_file(DST_DIR + "_hidden.1", "Test data 123") |
| self.filesystem_client.create_file(DST_DIR + "_hidden_2.1", "Test data 123") |
| self.filesystem_client.create_file(DST_DIR + ".hidden_3", "Test data 123") |
| self.filesystem_client.create_file(DST_DIR + ".hidden_4.1", "Test data 123") |
| self.filesystem_client.copy_from_local(SRC_DIR.format(file), DST_DIR) |
| |
| # Init test table |
| create_iceberg_table_from_directory(self.client, unique_database, |
| "iceberg_mixed_file_format_test", "parquet") |
| |
| # Execute tests |
| self.run_test_case('QueryTest/iceberg-load', vector, use_db=unique_database) |
| # Clean up temporary directory |
| self.filesystem_client.delete_file_dir("/tmp/{0}".format(unique_database), True) |
| |
| def test_table_sampling(self, vector): |
| self.run_test_case('QueryTest/iceberg-tablesample', vector, |
| use_db="functional_parquet") |
| |
| def _create_table_like_parquet_helper(self, vector, unique_database, tbl_name, |
| expect_success): |
| create_table_from_parquet(self.client, unique_database, tbl_name) |
| args = ['-q', "show files in {0}.{1}".format(unique_database, tbl_name)] |
| results = run_impala_shell_cmd(vector, args) |
| result_rows = results.stdout.strip().split('\n') |
| hdfs_file = None |
| for row in result_rows: |
| if "://" in row: |
| hdfs_file = row.split('|')[1].lstrip() |
| break |
| assert hdfs_file |
| |
| iceberg_tbl_name = "iceberg_{0}".format(tbl_name) |
| sql_stmt = "create table {0}.{1} like parquet '{2}' stored as iceberg".format( |
| unique_database, iceberg_tbl_name, hdfs_file |
| ) |
| args = ['-q', sql_stmt] |
| |
| return run_impala_shell_cmd(vector, args, expect_success=expect_success) |
| |
| def test_create_table_like_parquet(self, vector, unique_database): |
| tbl_name = 'alltypes_tiny_pages' |
| # Not all types are supported by iceberg |
| self._create_table_like_parquet_helper(vector, unique_database, tbl_name, False) |
| |
| tbl_name = "create_table_like_parquet_test" |
| results = self._create_table_like_parquet_helper(vector, unique_database, tbl_name, |
| True) |
| result_rows = results.stdout.strip().split('\n') |
| assert result_rows[3].split('|')[1] == ' Table has been created. ' |
| |
| sql_stmt = "describe {0}.{1}".format(unique_database, tbl_name) |
| args = ['-q', sql_stmt] |
| parquet_results = run_impala_shell_cmd(vector, args) |
| parquet_result_rows = parquet_results.stdout.strip().split('\n') |
| |
| parquet_column_name_type_list = [] |
| for row in parquet_result_rows[1:-2]: |
| parquet_column_name_type_list.append(row.split('|')[1:3]) |
| |
| sql_stmt = "describe {0}.iceberg_{1}".format(unique_database, tbl_name) |
| args = ['-q', sql_stmt] |
| iceberg_results = run_impala_shell_cmd(vector, args) |
| iceberg_result_rows = iceberg_results.stdout.strip().split('\n') |
| |
| iceberg_column_name_type_list = [] |
| for row in iceberg_result_rows[1:-2]: |
| iceberg_column_name_type_list.append(row.split('|')[1:3]) |
| |
| assert parquet_column_name_type_list == iceberg_column_name_type_list |
| |
| @SkipIfFS.hive |
| def test_hive_external_forbidden(self, unique_database): |
| tbl_name = unique_database + ".hive_ext" |
| error_msg = ("cannot be loaded because it is an EXTERNAL table in the HiveCatalog " |
| "that points to another table. Query the original table instead.") |
| self.execute_query("create table {0} (i int) stored by iceberg". |
| format(tbl_name)) |
| # 'iceberg.table_identifier' can refer to another table |
| self.run_stmt_in_hive("""alter table {0} set tblproperties |
| ('external.table.purge'='false', |
| 'iceberg.table_identifier'='functional_iceberg.iceberg_partitioned')""". |
| format(tbl_name)) |
| ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name)) |
| assert error_msg in str(ex) |
| # 'iceberg.mr.table.identifier' can refer to another table |
| self.run_stmt_in_hive(""" |
| alter table {0} unset tblproperties('iceberg.table_identifier')""". |
| format(tbl_name)) |
| self.run_stmt_in_hive("""alter table {0} set tblproperties |
| ('iceberg.mr.table.identifier'='functional_iceberg.iceberg_partitioned')""". |
| format(tbl_name)) |
| ex = self.execute_query_expect_failure(self.client, "refresh {0}".format(tbl_name)) |
| assert error_msg in str(ex) |
| # 'name' can also refer to another table but cannot be set by Hive/Impala. Also, |
| # during table migration both Impala and Hive clears existing table properties |
| # See IMPALA-12410 |
| |
| @SkipIfFS.incorrent_reported_ec |
| def test_compute_stats(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-compute-stats', vector, unique_database) |
| |
| def test_virtual_columns(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-virtual-columns', vector, unique_database) |
| |
| def test_avro_file_format(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-avro', vector, unique_database) |
| |
| def test_convert_table(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-migrate-from-external-hdfs-tables', |
| vector, unique_database) |
| |
| def test_table_exists(self, unique_database): |
| """Test that iceberg AlreadyExistsException are correctly handled.""" |
| tbl_name = unique_database + ".create_iceberg_exists" |
| # Attempt to create an iceberg table, simulating AlreadyExistsException |
| iceberg_created_options = {'debug_action': 'CATALOGD_ICEBERG_CREATE:EXCEPTION@' |
| 'IcebergAlreadyExistsException@Table was created concurrently'} |
| err = self.execute_query_expect_failure(self.client, |
| "create table {0} (i int) stored as iceberg".format(tbl_name), |
| query_options=iceberg_created_options) |
| assert "AlreadyExistsException: Table already exists" in str(err) |
| self.execute_query_expect_success(self.client, |
| "create table if not exists {0} (i int) stored as iceberg".format(tbl_name), |
| query_options=iceberg_created_options) |
| |
| def test_abort_transaction(self, unique_database): |
| """Test that iceberg operations fail correctly when an Iceberg transaction commit |
| fails, and that the effects of the failed operation are not visible.""" |
| tbl_name = unique_database + ".abort_iceberg_transaction" |
| # The query options that inject an iceberg transaction commit failure. |
| abort_ice_transaction_options = {'debug_action': |
| 'CATALOGD_ICEBERG_COMMIT:EXCEPTION@' |
| 'CommitFailedException@' |
| 'simulated commit failure'} |
| # Create an iceberg table and insert a row. |
| self.client.execute("""create table {0} (i int) |
| stored as iceberg""".format(tbl_name)) |
| self.execute_query_expect_success(self.client, |
| "insert into {0} values (1);".format(tbl_name)) |
| |
| # Run a query that would insert a row, but pass the query options that |
| # will cause the iceberg transaction to abort. |
| err = self.execute_query_expect_failure(self.client, |
| "insert into {0} values (2);".format(tbl_name), |
| query_options=abort_ice_transaction_options) |
| # Check that the error message looks reasonable. |
| result = str(err) |
| assert error_msg_startswith(result, |
| "ImpalaRuntimeException: simulated commit failure\n" |
| "CAUSED BY: CommitFailedException: simulated commit failure") |
| # Check that no data was inserted. |
| data = self.execute_query_expect_success(self.client, |
| "select * from {0}".format(tbl_name)) |
| assert data.column_labels == ['I'] |
| assert len(data.data) == 1 |
| assert data.data[0] == '1' |
| |
| # Run a query that would add a column to the table, but pass the query options that |
| # will cause the iceberg transaction to abort. |
| ddl_err = self.execute_query_expect_failure(self.client, |
| "alter table {0} add column {1} bigint" |
| .format(tbl_name, "j"), query_options=abort_ice_transaction_options) |
| ddl_result = str(ddl_err) |
| # Check that the error message looks reasonable. |
| assert error_msg_startswith(ddl_result, |
| "CommitFailedException: simulated commit failure") |
| # Check that no column was added. |
| data = self.execute_query_expect_success(self.client, |
| "select * from {0}".format(tbl_name)) |
| assert data.column_labels == ['I'] |
| assert len(data.data) == 1 |
| assert data.data[0] == '1' |
| |
| def test_drop_partition(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-drop-partition', vector, |
| use_db=unique_database) |
| |
| def test_rollback_after_drop_partition(self, unique_database): |
| table_name = "iceberg_drop_partition_rollback" |
| qualified_table_name = "{}.{}".format(unique_database, table_name) |
| create_table_stmt = """CREATE TABLE {}(identity_int int, unpartitioned_int int) |
| PARTITIONED BY SPEC (identity_int) STORED AS ICEBERG""".format(qualified_table_name) |
| insert_into_stmt = """INSERT INTO {} values(1, 2)""".format(qualified_table_name) |
| drop_partition_stmt = """ALTER TABLE {} DROP PARTITION (identity_int = 1)""".format( |
| qualified_table_name) |
| |
| self.execute_query(create_table_stmt) |
| self.execute_query(insert_into_stmt) |
| self.execute_query(drop_partition_stmt) |
| |
| snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=2) |
| rollback = """ALTER TABLE {} EXECUTE ROLLBACK ({})""".format( |
| qualified_table_name, snapshots[0].get_snapshot_id()) |
| # Rollback before DROP PARTITION |
| self.execute_query(rollback) |
| snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=3) |
| assert snapshots[0].get_snapshot_id() == snapshots[2].get_snapshot_id() |
| assert snapshots[0].get_parent_id() == snapshots[2].get_parent_id() |
| assert snapshots[0].get_creation_time() < snapshots[2].get_creation_time() |
| |
| def test_show_files_partition(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-show-files-partition', vector, |
| use_db=unique_database) |
| |
| def test_scan_metrics_in_profile_basic(self, vector): |
| self.run_test_case('QueryTest/iceberg-scan-metrics-basic', vector) |
| |
| |
| class TestIcebergV2Table(IcebergTestSuite): |
| """Tests related to Iceberg V2 tables.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestIcebergV2Table, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('table_format').file_format == 'parquet') |
| add_exec_option_dimension(cls, 'disable_optimized_iceberg_v2_read', [0, 1]) |
| |
| def should_run_for_hive(self, vector): |
| # Hive interop tests are very slow. Only run them for a subset of dimensions. |
| if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 0: |
| return True |
| return False |
| |
| # The test uses pre-written Iceberg tables where the position delete files refer to |
| # the data files via full URI, i.e. they start with 'hdfs://localhost:2050/...'. In the |
| # dockerised environment the namenode is accessible on a different hostname/port. |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| def test_plain_count_star_optimization(self, vector): |
| self.run_test_case('QueryTest/iceberg-v2-plain-count-star-optimization', |
| vector) |
| |
| def test_count_star_optimization_in_complex_query(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-v2-count-star-optimization-in-complex-query', |
| vector, unique_database) |
| |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| def test_read_position_deletes(self, vector): |
| # Remove 'batch_size' option so we can set it at .test file. |
| # Revisit this if 'batch_size' dimension size increase. |
| vector.unset_exec_option('batch_size') |
| self.run_test_case('QueryTest/iceberg-v2-read-position-deletes', vector) |
| |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| def test_read_position_deletes_orc(self, vector): |
| self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-orc', vector) |
| |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| @pytest.mark.execute_serially |
| def test_read_position_deletes_compute_stats(self, vector): |
| """Tests COMPUTE STATS on Iceberg V2 tables. Need to be executed serially |
| because it modifies tables that are used by other tests (e.g. multiple |
| instances of this test).""" |
| self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-stats', vector) |
| self.run_test_case('QueryTest/iceberg-v2-read-position-deletes-orc-stats', vector) |
| |
| @SkipIfFS.hive |
| def test_read_mixed_format_position_deletes(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-mixed-format-position-deletes', |
| vector, unique_database) |
| |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| def test_read_null_delete_records(self, vector): |
| expected_error = 'NULL found as file_path in delete file' |
| query_options = vector.get_value('exec_option') |
| v2_op_disabled = query_options['disable_optimized_iceberg_v2_read'] == 1 |
| result = self.execute_query( |
| 'select * from functional_parquet.iceberg_v2_null_delete_record', query_options) |
| assert len(result.data) == 6 |
| errors = result.log |
| print(errors) |
| assert expected_error in errors or v2_op_disabled |
| result = self.execute_query( |
| 'select count(*) from functional_parquet.iceberg_v2_null_delete_record', |
| query_options) |
| assert result.data == ['6'] |
| errors = result.log |
| assert expected_error in errors or v2_op_disabled |
| result = self.execute_query( |
| """select * from functional_parquet.iceberg_v2_null_delete_record |
| where j < 3""", query_options) |
| assert sorted(result.data) == ['1\t1', '2\t2'] |
| errors = result.log |
| assert expected_error in errors or v2_op_disabled |
| |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| def test_read_equality_deletes(self, vector): |
| self.run_test_case('QueryTest/iceberg-v2-read-equality-deletes', vector) |
| |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| def test_table_sampling_v2(self, vector): |
| self.run_test_case('QueryTest/iceberg-tablesample-v2', vector, |
| use_db="functional_parquet") |
| |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| def test_scan_metrics_in_profile_with_deletes(self, vector): |
| def get_latest_snapshot_id(fq_tbl_name): |
| query = ("select snapshot_id from {}.snapshots order by committed_at desc" |
| .format(fq_tbl_name)) |
| res = self.execute_query(query) |
| return res.data[0] |
| |
| ice_db = "functional_parquet" |
| |
| no_deletes = "{}.{}".format(ice_db, "iceberg_v2_no_deletes") |
| no_deletes_snapshot_id = get_latest_snapshot_id(no_deletes) |
| |
| pos_delete_all_rows = "{}.{}".format(ice_db, "iceberg_v2_positional_delete_all_rows") |
| pos_delete_all_rows_snapshot_id = get_latest_snapshot_id(pos_delete_all_rows) |
| |
| not_all_data_files_have_delete_files = "{}.{}".format( |
| ice_db, "iceberg_v2_positional_not_all_data_files_have_delete_files") |
| not_all_data_files_have_delete_files_snapshot_id = get_latest_snapshot_id( |
| not_all_data_files_have_delete_files) |
| |
| self.run_test_case('QueryTest/iceberg-scan-metrics-with-deletes', vector, |
| test_file_vars={ |
| "NO_DELETES_SNAPTHOT_ID": no_deletes_snapshot_id, |
| "POS_DELETE_ALL_ROWS_SNAPSHOT_ID": pos_delete_all_rows_snapshot_id, |
| "NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID": |
| not_all_data_files_have_delete_files_snapshot_id |
| }) |
| |
| @SkipIf.hardcoded_uris |
| def test_metadata_tables(self, vector, unique_database): |
| # Remove 'batch_size' option so we can set it at .test file. |
| # Revisit this if 'batch_size' dimension size increase. |
| vector.unset_exec_option('batch_size') |
| with self.create_impala_client() as impalad_client: |
| overwrite_snapshot_id = impalad_client.execute("select snapshot_id from " |
| "functional_parquet.iceberg_query_metadata.snapshots " |
| "where operation = 'overwrite';") |
| overwrite_snapshot_ts = impalad_client.execute("select committed_at from " |
| "functional_parquet.iceberg_query_metadata.snapshots " |
| "where operation = 'overwrite';") |
| self.run_test_case('QueryTest/iceberg-metadata-tables', vector, |
| unique_database, |
| test_file_vars={'$OVERWRITE_SNAPSHOT_ID': str(overwrite_snapshot_id.data[0]), |
| '$OVERWRITE_SNAPSHOT_TS': str(overwrite_snapshot_ts.data[0])}) |
| |
| @SkipIf.not_hdfs |
| def test_missing_data_files(self, vector, unique_database): |
| def list_files(tbl): |
| query_result = self.execute_query("select file_path from {}.`files`".format(tbl)) |
| return query_result.data |
| |
| def first_snapshot(tbl): |
| query_result = self.execute_query( |
| "select snapshot_id from {}.`snapshots` order by committed_at".format(tbl)) |
| return query_result.data[0] |
| |
| def insert_values(tbl, values): |
| self.execute_query("insert into {} values {}".format(tbl, values)) |
| |
| missing_files_nopart = unique_database + ".missing_files_nopart" |
| missing_files_part = unique_database + ".missing_files_part" |
| self.execute_query("""CREATE TABLE {} (i int, p int) |
| STORED BY ICEBERG |
| TBLPROPERTIES('format-version'='2')""".format(missing_files_nopart)) |
| insert_values(missing_files_nopart, "(1, 1)") |
| first_file = set(list_files(missing_files_nopart)) |
| insert_values(missing_files_nopart, "(2, 2)") |
| |
| all_files = set(list_files(missing_files_nopart)) |
| assert len(all_files) == 2 |
| second_file = next(iter(all_files - first_file)) |
| check_output(["hdfs", "dfs", "-rm", second_file]) |
| |
| self.execute_query("""CREATE TABLE {} (i int, p int) |
| PARTITIONED BY SPEC (p) |
| STORED BY ICEBERG |
| TBLPROPERTIES('format-version'='2')""".format(missing_files_part)) |
| insert_values(missing_files_part, "(1, 1)") |
| insert_values(missing_files_part, "(2, 2)") |
| files = list_files(missing_files_part) |
| part_2_f = None |
| for f in files: |
| if "p=2" in f: |
| part_2_f = f |
| break |
| assert part_2_f is not None |
| check_output(["hdfs", "dfs", "-rm", part_2_f]) |
| |
| self.execute_query("invalidate metadata {}".format(missing_files_nopart)) |
| self.execute_query("invalidate metadata {}".format(missing_files_part)) |
| |
| self.run_test_case('QueryTest/iceberg-missing-data-files', vector, |
| unique_database, |
| test_file_vars={ |
| '$NOPART_FIRST_SNAPSHOT': first_snapshot(missing_files_nopart), |
| '$PART_FIRST_SNAPSHOT': first_snapshot(missing_files_part)}) |
| |
| def test_delete(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-delete', vector, |
| unique_database) |
| |
| def test_delete_partitioned(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-delete-partitioned', vector, |
| unique_database) |
| if IS_HDFS and self.should_run_for_hive(vector): |
| self._delete_partitioned_hive_tests(unique_database) |
| |
| def _delete_partitioned_hive_tests(self, db): |
| hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format( |
| db, "id_part")) |
| assert hive_output == "id_part.i,id_part.s\n" |
| hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format( |
| db, "trunc_part")) |
| assert hive_output == "trunc_part.i,trunc_part.s\n" |
| hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format( |
| db, "multi_part")) |
| assert hive_output == "multi_part.i,multi_part.s,multi_part.f\n" |
| hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format( |
| db, "evolve_part")) |
| assert hive_output == \ |
| "evolve_part.i,evolve_part.s,evolve_part.f\n3,three,3.33\n" \ |
| "30,thirty,30.3\n40,forty,40.4\n" |
| hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}.{}".format( |
| db, "ice_store_sales")) |
| assert hive_output == "_c0\n2601498\n" |
| hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{}".format( |
| db, "ice_alltypes_part_v2")) |
| # Cut off the long header line. |
| hive_output = hive_output.split("\n", 1) |
| hive_output = hive_output[1] |
| assert hive_output == \ |
| "2,true,1,11,1.1,2.222,123.321,2022-02-22,impala\n" |
| |
| def test_large_scale_deletes(self, vector, unique_database): |
| if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 1: |
| pytest.skip("Only test the optimized v2 operator") |
| self.run_test_case('QueryTest/iceberg-large-scale-deletes', vector, |
| unique_database) |
| |
| @SkipIfFS.hive |
| def test_delete_hive_read(self, unique_database): |
| ice_delete = unique_database + ".ice_delete" |
| self.execute_query("""CREATE TABLE {} (i int, s string) |
| STORED BY ICEBERG |
| TBLPROPERTIES('format-version'='2')""".format(ice_delete)) |
| self.execute_query("INSERT INTO {} VALUES (1, 'one')".format(ice_delete)) |
| self.execute_query("INSERT INTO {} VALUES (2, 'two')".format(ice_delete)) |
| self.execute_query("INSERT INTO {} VALUES (3, 'three')".format(ice_delete)) |
| self.execute_query("DELETE FROM {} WHERE i = 2".format(ice_delete)) |
| |
| # Hive needs table property 'format-version' explicitly set |
| self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format( |
| ice_delete)) |
| hive_output = self.run_stmt_in_hive("SELECT * FROM {} ORDER BY i".format(ice_delete)) |
| expected_output = "ice_delete.i,ice_delete.s\n1,one\n3,three\n" |
| assert hive_output == expected_output |
| |
| ice_lineitem = unique_database + ".linteitem_ice" |
| self.execute_query("""CREATE TABLE {} |
| STORED BY ICEBERG |
| TBLPROPERTIES('format-version'='2') |
| AS SELECT * FROM tpch_parquet.lineitem""".format(ice_lineitem)) |
| self.execute_query("DELETE FROM {} WHERE l_orderkey % 5 = 0".format(ice_lineitem)) |
| impala_result = self.execute_query("SELECT count(*) FROM {}".format(ice_lineitem)) |
| assert impala_result.data[0] == "4799964" |
| # Hive needs table property 'format-version' explicitly set |
| self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format( |
| ice_lineitem)) |
| hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}".format(ice_lineitem)) |
| assert hive_output == "_c0\n4799964\n" |
| |
| @SkipIfFS.hive |
| def test_delete_complextypes_mixed_files(self, vector, unique_database): |
| ice_t = unique_database + ".ice_complex_delete" |
| self.run_stmt_in_hive("""create table {} |
| stored by iceberg stored as orc as |
| select * from functional_parquet.complextypestbl;""".format(ice_t)) |
| # Hive needs table property 'format-version' explicitly set |
| self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format( |
| ice_t)) |
| self.run_stmt_in_hive("""alter table {} |
| set tblproperties ('write.format.default'='parquet')""".format(ice_t)) |
| self.run_stmt_in_hive("""insert into {} |
| select * from functional_parquet.complextypestbl""".format(ice_t)) |
| |
| vector.get_value('exec_option')['expand_complex_types'] = True |
| self.run_test_case('QueryTest/iceberg-delete-complex', vector, |
| unique_database) |
| hive_output = self.run_stmt_in_hive("SELECT id FROM {} ORDER BY id".format(ice_t)) |
| # Test that Hive sees the same rows deleted. |
| assert hive_output == "id\n4\n5\n6\n7\n8\n" |
| |
| def test_update_basic(self, vector, unique_database): |
| udf_location = get_fs_path('/test-warehouse/libTestUdfs.so') |
| self.run_test_case('QueryTest/iceberg-update-basic', vector, |
| unique_database, test_file_vars={'UDF_LOCATION': udf_location}) |
| self._test_update_basic_snapshots(unique_database) |
| if IS_HDFS and self.should_run_for_hive(vector): |
| self._update_basic_hive_tests(unique_database) |
| |
| def _test_update_basic_snapshots(self, db): |
| """Verifies that the tables have the expected number of snapshots, and |
| the parent ids match the previous snapshot ids. See IMPALA-12708.""" |
| |
| self.validate_snapshots(db, "single_col", 3) |
| self.validate_snapshots(db, "ice_alltypes", 21) |
| self.validate_snapshots(db, "ice_id_partitioned", 7) |
| |
| def validate_snapshots(self, db, tbl, expected_snapshots): |
| tbl_name = "{}.{}".format(db, tbl) |
| snapshots = get_snapshots(self.client, tbl_name, |
| expected_result_size=expected_snapshots) |
| parent_id = None |
| for s in snapshots: |
| assert s.get_parent_id() == parent_id |
| parent_id = s.get_snapshot_id() |
| |
| def _update_basic_hive_tests(self, db): |
| def get_hive_results(tbl, order_by_col): |
| stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col) |
| return self.run_stmt_in_hive(stmt).split("\n", 1)[1] |
| |
| hive_results = get_hive_results("single_col", "i") |
| assert hive_results == "1\n3\n4\n" |
| |
| hive_results = get_hive_results("ice_alltypes", "bool_col") |
| assert hive_results == \ |
| "false,0,111,0.0,0.0,234,123.00,2023-11-07,2000-01-01 00:00:00.0,IMPALA,zerob\n" \ |
| "true,3,222,1.0,1.0,NULL,NULL,2023-11-08,2001-01-01 01:01:01.0,ICEBERG,oneb\n" |
| |
| hive_results = get_hive_results("ice_id_partitioned", "i") |
| assert hive_results == \ |
| "1,0,APACHE IMPALA\n" \ |
| "2,0,iceberg\n" \ |
| "3,0,hive\n" \ |
| "5,2,Kudu\n" \ |
| "10,1,Apache Spark\n" |
| |
| hive_results = get_hive_results("ice_bucket_transform", "i") |
| assert hive_results == \ |
| "2,a fairly long string value,1000,1999-09-19 12:00:01.0\n" \ |
| "4,bbb,2030,2001-01-01 00:00:00.0\n" \ |
| "6,cccccccccccccccccccccccccccccccccccccccc,-123,2023-11-24 17:44:30.0\n" |
| |
| hive_results = get_hive_results("ice_time_transforms_timestamp", "id") |
| assert hive_results == \ |
| "1.5000,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0\n" \ |
| "2.4690,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0\n" \ |
| "1999.9998,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0\n" # noqa: E501 |
| |
| hive_results = get_hive_results("ice_time_transforms_date", "id") |
| assert hive_results == \ |
| "1.5000,2001-01-01,2001-01-01,2001-01-01\n" \ |
| "2.4690,2023-11-24,2023-11-24,2023-11-24\n" \ |
| "1999.9998,2199-12-31,2199-12-31,2199-12-31\n" |
| |
| hive_results = get_hive_results("ice_part_transforms", "i") |
| assert hive_results == \ |
| "1,2023-11-13 18:07:05.0,blue,1234\n" \ |
| "3,2023-11-14 19:07:05.0,green,1700\n" \ |
| "4,2023-11-13 18:07:23.0,gray,2500\n" \ |
| "8,2023-11-01 00:11:11.0,black,722\n" |
| |
| def test_update_partitions(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-update-partitions', vector, |
| unique_database) |
| if IS_HDFS and self.should_run_for_hive(vector): |
| self._update_partitions_hive_tests(unique_database) |
| |
| def _update_partitions_hive_tests(self, db): |
| def get_hive_results(tbl, order_by_col): |
| stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col) |
| return self.run_stmt_in_hive(stmt).split("\n", 1)[1] |
| |
| hive_results = get_hive_results("id_part", "i, s") |
| assert hive_results == \ |
| "2,FIVE\n" \ |
| "2,FOUR\n" \ |
| "3,SIX\n" \ |
| "5,ONE\n" \ |
| "10,TWO\n" \ |
| "15,THREE\n" |
| |
| hive_results = get_hive_results("trunc_part", "i") |
| assert hive_results == \ |
| "1,one\n" \ |
| "5,five\n" \ |
| "103,three\n" \ |
| "1004,FOURfour\n" \ |
| "1006,SIXsix\n" \ |
| "1102,TWOtwo\n" |
| |
| hive_results = get_hive_results("multi_part", "i") |
| assert hive_results == \ |
| "0,void,3.14\n" \ |
| "0,void,3.14\n" \ |
| "0,void,3.14\n" \ |
| "1,one,1.1\n" \ |
| "3,three,3.33\n" \ |
| "5,five,5.5\n" \ |
| "111,fox,1.1\n" |
| |
| hive_results = get_hive_results("evolve_part", "i") |
| assert hive_results == \ |
| "1,one,1.1\n" \ |
| "30,thirty,30.3\n" \ |
| "40,forty,40.4\n" \ |
| "50,fifty,50.5\n" \ |
| "1003,three,3.33\n" \ |
| "1010,ten,10.1\n" \ |
| "1020,twenty,20.2\n" \ |
| "1222,two,2.2\n" |
| |
| hive_results = get_hive_results("date_day_part", "i") |
| assert hive_results == \ |
| "11,1978-01-01\n" \ |
| "12,1979-12-31\n" \ |
| "13,1980-01-01\n" \ |
| "14,2033-11-15\n" |
| |
| hive_results = get_hive_results("ts_hour_part", "i") |
| assert hive_results == \ |
| "101,1958-01-01 01:02:03.0\n" \ |
| "102,1959-12-31 23:59:00.0\n" \ |
| "103,1960-01-01 00:00:00.0\n" \ |
| "104,2013-11-15 15:31:00.0\n" |
| |
| hive_results = get_hive_results("ts_evolve_part", "i") |
| assert hive_results == \ |
| "1001,1988-02-02 01:02:03.0\n" \ |
| "1002,1990-02-01 23:59:00.0\n" \ |
| "1003,1990-02-02 00:00:00.0\n" \ |
| "1004,2043-12-16 15:31:00.0\n" \ |
| "1111,NULL\n" |
| |
| hive_results = get_hive_results("numeric_truncate", "id") |
| assert hive_results == \ |
| "11,21,2111,531,75.20\n" |
| |
| # HIVE-28048: Hive cannot run ORDER BY queries for Iceberg tables partitioned by |
| # decimal columns, so we order the results ourselves. |
| hive_results = self.run_stmt_in_hive("SELECT * FROM {}.{}".format( |
| db, "ice_alltypes_part_v2")) |
| # Throw away the header line and sort the results. |
| hive_results = hive_results.split("\n", 1)[1] |
| hive_results = hive_results.strip().split("\n") |
| hive_results.sort() |
| assert hive_results == [ |
| "2,true,2,11,1.1,2.222,123.321,2022-04-22,impala", |
| "3,true,3,11,1.1,2.222,123.321,2022-05-22,impala"] |
| |
| def test_optimize(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database) |
| expected_snapshots = 19 |
| self.validate_snapshots(unique_database, "ice_optimize", expected_snapshots) |
| |
| # The last operation was an OPTIMIZE TABLE statement. |
| # Check that time travel to the previous snapshot returns all results correctly. |
| tbl_name = unique_database + ".ice_optimize" |
| snapshots = get_snapshots( |
| self.client, tbl_name, expected_result_size=expected_snapshots) |
| snapshot_before_last = snapshots[-2] |
| |
| result_after_opt = self.execute_query("SELECT * FROM {0}".format(tbl_name)) |
| result_time_travel = self.execute_query( |
| "select * from {0} for system_version as of {1};".format( |
| tbl_name, snapshot_before_last.get_snapshot_id())) |
| assert result_after_opt.data.sort() == result_time_travel.data.sort() |
| |
| def _check_file_filtering(self, tbl_name, threshold_mb, mode, had_partition_evolution): |
| threshold_bytes = threshold_mb * 1024 * 1024 |
| DATA_FILE = "0" |
| DELETE_FILE = "1" |
| FileMetadata = namedtuple('FileMetadata', 'content, path, partition, size') |
| metadata_query = """select content, file_path, `partition`, file_size_in_bytes |
| from {0}.`files`;""".format(tbl_name) |
| result = self.execute_query(metadata_query) |
| files_before = set() |
| files_per_partition = defaultdict(set) |
| for line in result.data: |
| file = FileMetadata._make(line.split("\t")) |
| partition = file.partition |
| files_per_partition[partition].add(file) |
| files_before.add(file) |
| |
| selected_files = set() |
| partitions_with_removed_files = set() |
| for partition, files in files_per_partition.items(): |
| if len(files) > 1: |
| num_small_files = 0 |
| # count small datafiles |
| for file in files: |
| if file.content == DATA_FILE and int(file.size) < threshold_bytes: |
| num_small_files += 1 |
| for file in files: |
| # We assume that a delete file in a partition references all data files in |
| # that partition, because we cannot differentiate between data files |
| # with/without deletes. |
| if file.content == DELETE_FILE: |
| selected_files.update(files) |
| partitions_with_removed_files.add(partition) |
| break |
| # Only merge small files if there are at least 2 of them. |
| elif num_small_files > 1 and int(file.size) < threshold_bytes: |
| selected_files.add(file) |
| partitions_with_removed_files.add(partition) |
| |
| self.execute_query( |
| "OPTIMIZE TABLE {0} (file_size_threshold_mb={1});".format(tbl_name, threshold_mb)) |
| optimized_result = self.execute_query(metadata_query) |
| files_after = set() |
| for line in optimized_result.data: |
| file = FileMetadata._make(line.split("\t")) |
| files_after.add(file) |
| |
| # Check the resulting files and the modified partitions after the OPTIMIZE operation. |
| # files_after = files_before - selected_files + 1 new file per partition |
| # The result should not contain the files that were selected and should contain 1 new |
| # file per written partition. |
| unchanged_files = files_before - selected_files |
| # Check that files that were not selected are still present in the result. |
| assert unchanged_files.issubset(files_after) |
| # Check that selected files are rewritten and not present in the result. |
| assert selected_files.isdisjoint(files_after) |
| new_files = files_after - unchanged_files |
| assert new_files == files_after - files_before |
| |
| if mode == "NOOP": |
| assert selected_files == set([]) |
| assert files_after == files_before |
| elif mode == "REWRITE_ALL": |
| assert selected_files == files_before |
| assert files_after.isdisjoint(files_before) |
| elif mode == "PARTIAL": |
| assert selected_files < files_before and selected_files != set([]) |
| assert unchanged_files < files_after and unchanged_files != set([]) |
| assert unchanged_files == files_after.intersection(files_before) |
| |
| # Check that all delete files were merged. |
| for file in files_after: |
| assert file.content == DATA_FILE |
| # Check that there's only one new file in every partition. |
| partitions_with_new_files = set() |
| for file in new_files: |
| assert file.partition not in partitions_with_new_files |
| partitions_with_new_files.add(file.partition) |
| assert len(new_files) == len(partitions_with_new_files) |
| |
| # WITH PARTITION EVOLUTION |
| # Only new partitions are written to. |
| # WITHOUT PARTITION EVOLUTION |
| if not had_partition_evolution: |
| # Check that 1 new content file is written in every updated partition. |
| assert len(new_files) == len(partitions_with_removed_files) |
| assert partitions_with_new_files == partitions_with_removed_files |
| |
| def test_optimize_file_filtering(self, unique_database): |
| tbl_name = unique_database + ".ice_optimize_filter" |
| self.execute_query("""CREATE TABLE {0} partitioned by spec (l_linenumber) |
| STORED BY ICEBERG TBLPROPERTIES ('format-version'='2') |
| AS SELECT * FROM tpch_parquet.lineitem |
| WHERE l_quantity < 10;""".format(tbl_name)) |
| self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem |
| WHERE l_quantity>=10 AND l_quantity<=12;""".format(tbl_name)) |
| # There are no delete files in the table, so this should be a no-op. Check that no new |
| # snapshot was created. |
| self._check_file_filtering(tbl_name, 0, "NOOP", False) |
| assert len(get_snapshots(self.client, tbl_name)) == 2 |
| self._check_file_filtering(tbl_name, 5, "PARTIAL", False) |
| self._check_file_filtering(tbl_name, 50, "PARTIAL", False) |
| # Check that the following is a no-op, since the table is already in a compact form. |
| self._check_file_filtering(tbl_name, 100, "NOOP", False) |
| self.execute_query("""UPDATE {0} SET l_linenumber=7 WHERE l_linenumber>4 AND |
| l_linestatus='F';""".format(tbl_name)) |
| self._check_file_filtering(tbl_name, 6, "PARTIAL", False) |
| self.execute_query("""ALTER TABLE {0} SET PARTITION SPEC(l_linestatus);""" |
| .format(tbl_name)) |
| self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE l_shipmode='MAIL' |
| AND l_linenumber<4;""".format(tbl_name)) |
| self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem |
| WHERE l_quantity=13 AND l_linenumber<3;""".format(tbl_name)) |
| self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem |
| WHERE l_quantity=14 AND l_linenumber<3;""".format(tbl_name)) |
| # Merges the delete files and rewrites the small files. |
| self._check_file_filtering(tbl_name, 2, "PARTIAL", True) |
| # Rewrites the remaining small files (2MB <= file_size < 100MB). |
| self._check_file_filtering(tbl_name, 100, "PARTIAL", True) |
| self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE l_shipmode='MAIL';""" |
| .format(tbl_name)) |
| # All partitions have delete files, therefore the entire table is rewritten. |
| self._check_file_filtering(tbl_name, 100, "REWRITE_ALL", True) |
| |
| def test_merge(self, vector, unique_database): |
| udf_location = get_fs_path('/test-warehouse/libTestUdfs.so') |
| self.run_test_case('QueryTest/iceberg-merge', vector, unique_database, |
| test_file_vars={'UDF_LOCATION': udf_location}) |
| |
| def test_merge_partition(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-merge-partition', vector, unique_database) |
| |
| def test_merge_partition_sort(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-merge-partition-sort', vector, unique_database) |
| |
| def test_merge_long(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-merge-long', vector, unique_database) |
| |
| def test_merge_star(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database) |
| |
| def test_merge_equality_update(self, vector, unique_database): |
| create_iceberg_table_from_directory(self.client, unique_database, |
| "iceberg_v2_delete_equality_partitioned", "parquet", |
| table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice", |
| warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")) |
| self.run_test_case('QueryTest/iceberg-merge-equality-update', vector, unique_database) |
| |
| def test_merge_equality_insert(self, vector, unique_database): |
| create_iceberg_table_from_directory(self.client, unique_database, |
| "iceberg_v2_delete_equality_partitioned", "parquet", |
| table_location="${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice", |
| warehouse_prefix=os.getenv("FILESYSTEM_PREFIX")) |
| self.run_test_case('QueryTest/iceberg-merge-equality-insert', vector, unique_database) |
| |
| def test_merge_duplicate_check(self, vector, unique_database): |
| """Regression test for IMPALA-13932""" |
| # Remove 'num_nodes' option so we can set it at .test file. |
| vector.unset_exec_option('num_nodes') |
| self.run_test_case('QueryTest/iceberg-merge-duplicate-check', vector, unique_database) |
| |
| def test_writing_multiple_deletes_per_partition(self, vector, unique_database): |
| """Test writing multiple delete files partition in a single DELETE operation.""" |
| self.run_test_case('QueryTest/iceberg-multiple-delete-per-partition', vector, |
| use_db=unique_database) |
| |
| def test_cleanup(self, unique_database): |
| """Test that all uncommitted files written by Impala are removed from the file |
| system when a DML commit to an Iceberg table fails, and that the effects of the |
| failed operation are not visible.""" |
| table_name = "iceberg_cleanup_failure" |
| fq_tbl_name = unique_database + "." + table_name |
| # The query options that inject an iceberg validation check failure. |
| fail_ice_commit_options = {'debug_action': |
| 'CATALOGD_ICEBERG_CONFLICT:EXCEPTION@' |
| 'ValidationException@' |
| 'simulated validation check failure'} |
| # Create an iceberg table and insert a row. |
| self.execute_query_expect_success(self.client, """CREATE TABLE {0} (i int) |
| STORED BY ICEBERG TBLPROPERTIES ('format-version'='2')""".format(fq_tbl_name)) |
| self.execute_query_expect_success(self.client, |
| "insert into {0} values (1)".format(fq_tbl_name)) |
| |
| # Run a query that would update a row, but pass the query options that |
| # will cause the iceberg validation check to fail. |
| err = self.execute_query_expect_failure(self.client, |
| "update {0} set i=2 where i=1".format(fq_tbl_name), |
| query_options=fail_ice_commit_options) |
| # Check that we get the error message. |
| assert error_msg_startswith(str(err), |
| "ImpalaRuntimeException: simulated validation check failure\n" |
| "CAUSED BY: ValidationException: simulated validation check failure") |
| # Check that the table content was not updated. |
| data = self.execute_query_expect_success(self.client, |
| "select * from {0}".format(fq_tbl_name)) |
| assert len(data.data) == 1 |
| assert data.data[0] == '1' |
| |
| # Check that the uncommitted data and delete files are removed from the file system |
| # and only the first data file remains. |
| table_location = "{0}/test-warehouse/{1}.db/{2}/data".format( |
| FILESYSTEM_PREFIX, unique_database, table_name) |
| files_result = check_output(["hdfs", "dfs", "-ls", table_location]) |
| assert "Found 1 items" in bytes_to_str(files_result) |
| |
| def test_predicate_push_down_hint(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-predicate-push-down-hint', vector, |
| use_db=unique_database) |
| |
| def test_partitions(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-partitions', vector, unique_database) |
| tbl_name = unique_database + ".ice_num_partitions" |
| snapshots = get_snapshots(self.client, tbl_name, expected_result_size=4) |
| second_snapshot = snapshots[1] |
| time_travel_data = self.execute_query( |
| "SELECT * FROM {0} for system_version as of {1};".format( |
| tbl_name, second_snapshot.get_snapshot_id())) |
| assert "partitions=4/unknown" in time_travel_data.runtime_profile |
| selective_time_travel_data = self.execute_query( |
| "SELECT * FROM {0} for system_version as of {1} WHERE id < 5;".format( |
| tbl_name, second_snapshot.get_snapshot_id())) |
| assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile |
| |
| def test_table_repair(self, unique_database): |
| tbl_name = 'tbl_with_removed_files' |
| db_tbl = unique_database + "." + tbl_name |
| repair_query = "alter table {0} execute repair_metadata()" |
| with self.create_impala_client() as impalad_client: |
| impalad_client.execute( |
| "create table {0} (i int) stored as iceberg tblproperties('format-version'='2')" |
| .format(db_tbl)) |
| insert_q = "insert into {0} values ({1})" |
| self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1)) |
| self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2)) |
| self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3)) |
| self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 4)) |
| self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 5)) |
| result = impalad_client.execute('select i from {0} order by i'.format(db_tbl)) |
| assert result.data == ['1', '2', '3', '4', '5'] |
| |
| TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name) |
| DATA_PATH = os.path.join(TABLE_PATH, "data") |
| |
| # Check that table remains intact if there are no missing files |
| result = self.execute_query_expect_success( |
| impalad_client, repair_query.format(db_tbl)) |
| assert result.data[0] == "No missing data files detected." |
| result = impalad_client.execute('select i from {0} order by i'.format(db_tbl)) |
| assert result.data == ['1', '2', '3', '4', '5'] |
| |
| # Delete 2 data files from the file system directly to corrupt the table. |
| data_files = self.filesystem_client.ls(DATA_PATH) |
| self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[0]) |
| self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[1]) |
| self.execute_query_expect_success(impalad_client, "invalidate metadata") |
| result = self.execute_query_expect_success( |
| impalad_client, repair_query.format(db_tbl)) |
| assert result.data[0] == \ |
| "Iceberg table repaired by deleting 2 manifest entries of missing data files." |
| result = impalad_client.execute('select * from {0} order by i'.format(db_tbl)) |
| assert len(result.data) == 3 |
| |
| |
| # Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most |
| # of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it |
| # runs also with the V2 optimizations setting turned off, some tests were moved here. |
| class TestIcebergDirectedMode(IcebergTestSuite): |
| """Tests related to Iceberg DIRECTED distribution mode.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestIcebergDirectedMode, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('table_format').file_format == 'parquet') |
| |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| def test_directed_mode(self, vector): |
| self.run_test_case('QueryTest/iceberg-v2-directed-mode', vector) |