| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| import os |
| |
| from hmsclient.genthrift.hive_metastore.ttypes import LockResponse, LockState, NoSuchObjectException |
| from iceberg.exceptions import AlreadyExistsException |
| from iceberg.hive import HiveTables |
| import mock |
| import pytest |
| from pytest import raises |
| |
| |
| class MockHMSTable(object): |
| def __init__(self, params): |
| self.parameters = params |
| |
| |
| @mock.patch("iceberg.hive.HiveTableOperations.refresh_from_metadata_location") |
| @mock.patch("iceberg.hive.HiveTableOperations.current") |
| @mock.patch("iceberg.hive.HiveTables.get_client") |
| def test_load_tables_check_valid_props(client, current_call, refresh_call): |
| parameters = {"table_type": "ICEBERG", |
| "partition_spec": [], |
| "metadata_location": "s3://path/to/iceberg.metadata.json"} |
| |
| client.return_value.__enter__.return_value.get_table.return_value = MockHMSTable(parameters) |
| |
| conf = {"hive.metastore.uris": 'thrift://hms:port'} |
| tables = HiveTables(conf) |
| tables.load("test.test_123") |
| |
| |
| @mock.patch("iceberg.hive.HiveTableOperations.refresh_from_metadata_location") |
| @mock.patch("iceberg.hive.HiveTableOperations.current") |
| @mock.patch("iceberg.hive.HiveTables.get_client") |
| def test_load_tables_check_missing_iceberg_type(client, current_call, refresh_call): |
| parameters = {"partition_spec": [], |
| "metadata_location": "s3://path/to/iceberg.metdata.json"} |
| |
| client.return_value.__enter__.return_value.get_table.return_value = MockHMSTable(parameters) |
| |
| conf = {"hive.metastore.uris": 'thrift://hms:port'} |
| tables = HiveTables(conf) |
| with raises(RuntimeError): |
| tables.load("test.test_123") |
| |
| |
| @mock.patch("iceberg.hive.HiveTableOperations.refresh_from_metadata_location") |
| @mock.patch("iceberg.hive.HiveTableOperations.current") |
| @mock.patch("iceberg.hive.HiveTables.get_client") |
| def test_load_tables_check_non_iceberg_type(client, current_call, refresh_call): |
| parameters = {"table_type": "HIVE", |
| "partition_spec": [], |
| "metadata_location": "s3://path/to/iceberg.metdata.json"} |
| |
| client.return_value.__enter__.return_value.get_table.return_value = MockHMSTable(parameters) |
| |
| conf = {"hive.metastore.uris": 'thrift://hms:port'} |
| tables = HiveTables(conf) |
| with raises(RuntimeError): |
| tables.load("test.test_123") |
| |
| |
| @mock.patch("iceberg.hive.HiveTableOperations.refresh_from_metadata_location") |
| @mock.patch("iceberg.hive.HiveTableOperations.current") |
| @mock.patch("iceberg.hive.HiveTables.get_client") |
| def test_load_tables_check_none_table_type(client, current_call, refresh_call): |
| parameters = {"table_type": None, |
| "partition_spec": [], |
| "metadata_location": "s3://path/to/iceberg.metdata.json"} |
| |
| client.return_value.__enter__.return_value.get_table.return_value = MockHMSTable(parameters) |
| |
| conf = {"hive.metastore.uris": 'thrift://hms:port'} |
| tables = HiveTables(conf) |
| with raises(RuntimeError): |
| tables.load("test.test_123") |
| |
| |
| @mock.patch("iceberg.hive.HiveTableOperations.refresh_from_metadata_location") |
| @mock.patch("iceberg.hive.HiveTableOperations.current") |
| @mock.patch("iceberg.hive.HiveTables.get_client") |
| def test_load_tables_check_no_location(client, current_call, refresh_call): |
| parameters = {"table_type": "ICEBERG", |
| "partition_spec": []} |
| |
| client.return_value.__enter__.return_value.get_table.return_value = MockHMSTable(parameters) |
| |
| conf = {"hive.metastore.uris": 'thrift://hms:port'} |
| tables = HiveTables(conf) |
| with raises(RuntimeError): |
| tables.load("test.test_123") |
| |
| |
| @mock.patch("iceberg.hive.HiveTableOperations.refresh_from_metadata_location") |
| @mock.patch("iceberg.hive.HiveTableOperations.current") |
| @mock.patch("iceberg.hive.HiveTables.get_client") |
| def test_load_tables_check_none_location(client, current_call, refresh_call): |
| parameters = {"table_type": "ICEBERG", |
| "partition_spec": [], |
| "metadata_location": None} |
| |
| client.return_value.__enter__.return_value.get_table.return_value = MockHMSTable(parameters) |
| |
| conf = {"hive.metastore.uris": 'thrift://hms:port'} |
| tables = HiveTables(conf) |
| with raises(RuntimeError): |
| tables.load("test.test_123") |
| |
| |
| @mock.patch("iceberg.hive.HiveTableOperations.refresh_from_metadata_location") |
| @mock.patch("iceberg.hive.HiveTableOperations.current") |
| @mock.patch("iceberg.hive.HiveTables.get_client") |
| def test_create_tables_failed(client, current_call, refresh_call, base_scan_schema, base_scan_partition, tmpdir): |
| client.return_value.__enter__.return_value.get_table.side_effect = NoSuchObjectException() |
| current_call.return_value = None |
| conf = {"hive.metastore.uris": 'thrift://hms:port', |
| "hive.metastore.warehouse.dir": tmpdir} |
| tables = HiveTables(conf) |
| with pytest.raises(AlreadyExistsException): |
| tables.create(base_scan_schema, "test.test_123", base_scan_partition) |
| assert len(os.listdir(os.path.join(tmpdir, "test.db", "test_123", "metadata"))) == 0 |
| |
| |
| @mock.patch("iceberg.hive.HiveTableOperations.current") |
| @mock.patch("iceberg.hive.HiveTables.get_client") |
| def test_create_tables(client, current_call, base_scan_schema, base_scan_partition, tmpdir): |
| |
| client.return_value.__enter__.return_value.get_table.side_effect = NoSuchObjectException() |
| current_call.return_value = None |
| client.return_value.__enter__.return_value.lock.return_value = LockResponse("x", LockState.WAITING) |
| client.return_value.__enter__.return_value.check_lock.return_value = LockResponse("x", LockState.ACQUIRED) |
| tbl = client.return_value.__enter__.return_value.create_table.call_args_list |
| conf = {"hive.metastore.uris": 'thrift://hms:port', |
| "hive.metastore.warehouse.dir": tmpdir} |
| tables = HiveTables(conf) |
| tables.create(base_scan_schema, "test.test_123", base_scan_partition) |
| |
| client.return_value.__enter__.return_value.get_table.return_value = MockHMSTable(tbl[0].args[0].parameters) |
| client.return_value.__enter__.return_value.get_table.side_effect = None |
| current_call.return_value = tbl[0].args[0].parameters['metadata_location'] |
| |
| tables.load("test.test_123") |