blob: 02510329bec9c7261a8304886febbf6ffcd88f6e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from hive_metastore.ttypes import Database
from hive_metastore.ttypes import FieldSchema
from hive_metastore.ttypes import GetTableRequest
from hive_metastore.ttypes import GetPartitionsByNamesRequest
from hive_metastore.ttypes import TruncateTableRequest
from hive_metastore.ttypes import Table
from hive_metastore.ttypes import StorageDescriptor
from hive_metastore.ttypes import SerDeInfo
from tests.util.event_processor_utils import EventProcessorUtils
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.util.filesystem_utils import (IS_S3, IS_ADLS, IS_GCS)
class TestMetastoreService(CustomClusterTestSuite):
"""
Tests for the Catalog Metastore service. Each test in this class should
start a hms_server using the catalogd flag --start_hms_server=true
"""
part_tbl = ImpalaTestSuite.get_random_name("test_metastore_part_tbl")
unpart_tbl = ImpalaTestSuite.get_random_name("test_metastore_unpart_tbl")
acid_part_tbl = ImpalaTestSuite.get_random_name("test_metastore_acid_part_tbl")
unpart_acid_tbl = ImpalaTestSuite.get_random_name("test_metastore_unpart_acid_tbl")
default_unknowntbl = ImpalaTestSuite.get_random_name("test_metastore_default_tbl")
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--start_hms_server=true "
"--hms_port=5899"
)
def test_passthrough_apis(self):
"""
This test exercises some of the Catalog HMS APIs which are directly
passed through to the backing HMS service. This is by no means an
exhaustive set but merely used to as a sanity check to make sure that a
hive_client can connect to the Catalog's metastore service and is
able to execute calls to the backing HMS service.
"""
catalog_hms_client = None
db_name = ImpalaTestSuite.get_random_name("test_passthrough_apis_db")
try:
catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
assert catalog_hms_client is not None
# get_databases
databases = catalog_hms_client.get_all_databases()
assert databases is not None
assert len(databases) > 0
assert "functional" in databases
# get_database
database = catalog_hms_client.get_database("functional")
assert database is not None
assert "functional" == database.name
# get_tables
tables = catalog_hms_client.get_tables("functional", "*")
assert tables is not None
assert len(tables) > 0
assert "alltypes" in tables
# get table
table = catalog_hms_client.get_table("functional", "alltypes")
assert table is not None
assert "alltypes" == table.tableName
assert table.sd is not None
# get partitions
partitions = catalog_hms_client.get_partitions("functional", "alltypes", -1)
assert partitions is not None
assert len(partitions) > 0
# get partition names
part_names = catalog_hms_client.get_partition_names("functional", "alltypes",
-1)
assert part_names is not None
assert len(part_names) > 0
assert "year=2009/month=1" in part_names
# notification APIs
event_id = EventProcessorUtils.get_current_notification_id(catalog_hms_client)
assert event_id is not None
assert event_id > 0
# DDLs
catalog_hms_client.create_database(self.__get_test_database(db_name))
database = catalog_hms_client.get_database(db_name)
assert database is not None
assert db_name == database.name
tbl_name = ImpalaTestSuite.get_random_name(
"test_passthrough_apis_tbl")
cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
part_cols = [["part", "string", "part col"]]
catalog_hms_client.create_table(self.__get_test_tbl(db_name, tbl_name,
cols, part_cols))
table = catalog_hms_client.get_table(db_name, tbl_name)
assert table is not None
assert tbl_name == table.tableName
self.__compare_cols(cols, table.sd.cols)
self.__compare_cols(part_cols, table.partitionKeys)
finally:
if catalog_hms_client is not None:
catalog_hms_client.drop_database(db_name, True, True)
catalog_hms_client.shutdown()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--start_hms_server=true "
"--hms_port=5899"
)
def test_get_table_req_with_fallback(self):
"""
Test the get_table_req APIs with fallback to HMS enabled. These calls
succeed even if catalog throws exceptions since we fallback to HMS.
"""
catalog_hms_client = None
db_name = ImpalaTestSuite.get_random_name(
"test_get_table_req_with_fallback_db")
try:
catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
assert catalog_hms_client is not None
# Test simple get_table_req without stats.
get_table_request = GetTableRequest()
get_table_request.dbName = "functional"
get_table_request.tblName = "alltypes"
get_table_request.getFileMetadata = True
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == "functional"
assert get_table_response.table.tableName == "alltypes"
# Request did not ask for stats, verify colStats are not populated.
assert get_table_response.table.colStats is None
# assert that fileMetadata is in the response
self.__assert_filemd(get_table_response.table.fileMetadata,
get_table_response.table.dictionary)
# Test get_table_request with stats and engine set to Impala.
get_table_request.getColumnStats = True
get_table_request.engine = "impala"
get_table_request.getFileMetadata = False
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == "functional"
assert get_table_response.table.tableName == "alltypes"
assert get_table_response.table.colStats is not None
# Verify the column stats objects are populated for
# non-clustering columns.
assert len(get_table_response.table.colStats.statsObj) > 0
# file-metadata is not requested and hence should be not be set
# assert that fileMetadata is in the response
self.__assert_no_filemd(get_table_response.table.fileMetadata,
get_table_response.table.dictionary)
# Create table in Hive and test this is properly falling back to HMS.
# Create table via Hive.
catalog_hms_client.create_database(self.__get_test_database(db_name))
database = catalog_hms_client.get_database(db_name)
assert database is not None
assert db_name == database.name
tbl_name = ImpalaTestSuite.get_random_name(
"test_get_table_req_with_fallback_tbl")
cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
catalog_hms_client.create_table(self.__get_test_tbl(db_name, tbl_name, cols))
get_table_request.dbName = db_name
get_table_request.tblName = tbl_name
get_table_request.getColumnStats = True
get_table_request.getFileMetadata = True
# Engine is not specified, this should throw an exception even if
# fallback_to_hms_on_errors is true.
expected_exception = "isGetColumnStats() is true in the request but " \
"engine is not specified."
try:
catalog_hms_client.get_table_req(get_table_request)
except Exception as e:
if expected_exception is not None:
assert expected_exception in str(e)
# Set engine and request impala, this should fallback to HMS since
# the table is not in catalog cache. File metadata should be set even if the
# request is served via HMS fallback.
get_table_request.engine = "Impala"
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == db_name
assert get_table_response.table.tableName == tbl_name
assert get_table_response.table.fileMetadata is not None
# The table does not have any data so we expect the fileMetadata.data to be None
assert get_table_response.table.fileMetadata.data is None
# even if there are no files, the object dictionary should be not null and empty
assert get_table_response.table.dictionary is not None
assert len(get_table_response.table.dictionary.values) == 0
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
def __get_database_no_throw(self, db_name):
try:
return self.hive_client.get_database(db_name)
except Exception:
return None
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--start_hms_server=true "
"--hms_port=5899 "
"--fallback_to_hms_on_errors=false"
)
def test_get_table_req_without_fallback(self):
"""
Test the get_table_req APIs with fallback to HMS enabled. These calls
throw exceptions since we do not fallback to HMS if the db/table is not
in Catalog cache.
"""
catalog_hms_client = None
db_name = ImpalaTestSuite.get_random_name(
"test_get_table_req_without_fallback_db")
new_db_name = None
try:
catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
assert catalog_hms_client is not None
# Create table via Hive.
self.hive_client.create_database(self.__get_test_database(db_name))
database = self.hive_client.get_database(db_name)
assert database is not None
assert db_name == database.name
tbl_name = ImpalaTestSuite.get_random_name("test_get_table_req_tbl")
cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
self.hive_client.create_table(self.__get_test_tbl(db_name, tbl_name, cols))
# Test get_table_request with stats and engine set to Impala.
# Test simple get_table_req without stats.
get_table_request = GetTableRequest()
get_table_request.dbName = db_name
get_table_request.tblName = tbl_name
get_table_request.getColumnStats = True
# Engine is not set, this should throw an exception without falling
# back to HMS.
expected_exception_str = "Column stats are requested " \
"but engine is not set in the request."
self.__call_get_table_req_expect_exception(catalog_hms_client,
get_table_request, expected_exception_str)
# Verify DB not found exception is thrown. Engine does not matter,
# we only return Impala table level column stats but this field is
# required to be consistent with Hive's implementation.
get_table_request.engine = "Impala"
expected_exception_str = "Database " + db_name + " not found"
self.__call_get_table_req_expect_exception(catalog_hms_client,
get_table_request, expected_exception_str)
# Create database in Impala
new_db_name = ImpalaTestSuite.get_random_name(
"test_get_table_req_without_fallback_db")
query = "create database " + new_db_name
self.execute_query_expect_success(self.client, query)
new_tbl_name = ImpalaTestSuite.get_random_name(
"test_get_table_req_without_fallback_tbl")
new_cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
# DDLs currently only pass-through to HMS so this call will not create a table
# in catalogd.
catalog_hms_client.create_table(self.__get_test_tbl(new_db_name, new_tbl_name,
new_cols))
# Verify table not found exception is thrown
get_table_request.dbName = new_db_name
get_table_request.tblName = new_tbl_name
expected_exception_str = "Table " + new_db_name + "." + new_tbl_name \
+ " not found"
self.__call_get_table_req_expect_exception(catalog_hms_client,
get_table_request, expected_exception_str)
# Create a table in Impala and verify that the get_table_req gets the
# table and stats for non-clustering columns immediately.
impala_tbl_name = ImpalaTestSuite.get_random_name(
"test_get_table_req_without_fallback_impala_tbl")
query = "create table " + new_db_name + "." + impala_tbl_name + \
"(id int) partitioned by (year int)"
self.execute_query_expect_success(self.client, query)
get_table_request.dbName = new_db_name
get_table_request.tblName = impala_tbl_name
get_table_request.getColumnStats = True
# Engine does not matter, we only return Impala table level column
# stats but this field is required to be consistent with Hive's
# implementation.
get_table_request.engine = "impala"
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == new_db_name
assert get_table_response.table.tableName == impala_tbl_name
assert get_table_response.table.colStats is not None
assert get_table_response.table.colStats.statsObj is not None
# Verify there is a ColumnStatisticsObj only for non-clustering columns
# (id in this case).
assert len(get_table_response.table.colStats.statsObj) == 1
assert get_table_response.table.colStats.statsObj[0].colName == "id"
# Verify a non-default catalog in the request throws an exception.
get_table_request.catName = "testCatalog"
expected_exception_str = "Catalog service does not support " \
"non-default catalogs"
self.__call_get_table_req_expect_exception(catalog_hms_client,
get_table_request, expected_exception_str)
# fetch file-metadata of a non-partitioned table
get_table_request = GetTableRequest()
get_table_request.dbName = "functional"
get_table_request.tblName = "tinytable"
get_table_request.getFileMetadata = True
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == "functional"
assert get_table_response.table.tableName == "tinytable"
self.__assert_filemd(get_table_response.table.fileMetadata,
get_table_response.table.dictionary)
# fetch file-metadata along with stats
get_table_request = GetTableRequest()
get_table_request.dbName = "functional"
get_table_request.tblName = "tinytable"
get_table_request.getFileMetadata = True
get_table_request.getColumnStats = True
get_table_request.engine = "impala"
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == "functional"
assert get_table_response.table.tableName == "tinytable"
self.__assert_filemd(get_table_response.table.fileMetadata,
get_table_response.table.dictionary)
assert get_table_response.table.colStats is not None
assert get_table_response.table.colStats.statsObj is not None
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
if self.__get_database_no_throw(new_db_name) is not None:
self.hive_client.drop_database(new_db_name, True, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--start_hms_server=true "
"--hms_port=5899 "
"--fallback_to_hms_on_errors=false"
)
def test_get_partitions_by_names(self):
catalog_hms_client = None
try:
catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
assert catalog_hms_client is not None
valid_part_names = ["year=2009/month=1", "year=2009/month=2", "year=2009/month=3",
"year=2009/month=4"]
self.__run_partitions_by_names_tests(catalog_hms_client, "functional_parquet",
"alltypestiny", valid_part_names)
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--start_hms_server=true "
"--hms_port=5899 "
"--fallback_to_hms_on_errors=true"
)
def test_fallback_get_partitions_by_names(self, unique_database):
"""
Test makes sure that the fall-back path is working in case of errors in catalogd's
implementation of get_partitions_by_name_req.
"""
catalog_client = None
try:
catalog_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
assert catalog_client is not None
assert self.hive_client is not None
self.__create_test_tbls_from_hive(unique_database)
valid_part_names = ["year=2009/month=1", "year=2009/month=2", "year=2009/month=3",
"year=2009/month=4"]
self.__run_partitions_by_names_tests(catalog_client, unique_database,
TestMetastoreService.part_tbl, valid_part_names, True)
finally:
if catalog_client is not None:
catalog_client.shutdown()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--start_hms_server=true "
"--hms_port=5899 "
"--fallback_to_hms_on_errors=true "
"--invalidate_hms_cache_on_ddls=true"
)
def test_cache_invalidated_on_nontransactional_table_ddls(self):
db_name = ImpalaTestSuite.get_random_name(
"test_cache_invalidated_on_nontransactional_table_ddls_db")
tbl_name = ImpalaTestSuite.get_random_name(
"test_cache_invalidated_on_nontransactional_table_ddls_tbl")
self.__test_non_transactional_table_cache_helper(db_name, tbl_name, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--start_hms_server=true "
"--hms_port=5899 "
"--fallback_to_hms_on_errors=true "
"--invalidate_hms_cache_on_ddls=false"
)
def test_cache_valid_on_nontransactional_table_ddls(self):
db_name = ImpalaTestSuite.get_random_name(
"test_cache_valid_on_nontransactional_table_ddls_db")
tbl_name = ImpalaTestSuite.get_random_name(
"test_cache_valid_on_nontransactional_table_ddls_tbl")
self.__test_non_transactional_table_cache_helper(db_name, tbl_name, False)
def __test_non_transactional_table_cache_helper(self, db_name,
tbl_name, invalidateCache):
"""
This tests the following DDLs after creating table and loading it in cache:
1. Drop partition:
If invalidateCache is True, then table should be removed from cache
and then subsequent get_partitions_by_names_req should load table from HMS
If invalidateCache is False, the get_partitions_by_names_req
should be served from the already cached table
2. Truncate table remove partition:
If invalidateCache is True, then table should be removed from cache
and then subsequent get_partitions_by_names_req should load table from HMS
and the file metadata for the truncated partition should be none
If invalidateCache is False, the get_partitions_by_names_req
should be served from the already cached table which has stale
partition file metadata
2. Alter table rename :
If invalidateCache is True, this should remove the old table from the cache
and add new table. Subsequent get_table req on old table should throw an
exception. If invalidateCache is False, the old table should still be
present in the cache and subsequent get_table req should serve old table
from the cache
3. Alter table add column:
If invalidateCache is True, then table should be invalidated from cache
and then subsequent get_table_req should reload table from HMS
If invalidateCache is False, the should be served from the already
cached table which has the stale column info
4. drop table:
If invalidateCache is True, the table is removed from the cache
If invalidateCache is False, the table is *not* removed from the cache
"""
catalog_hms_client = None
cur_get_table_response = None
prev_get_table_response = None
try:
catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
assert catalog_hms_client is not None
create_database_query = "create database " + db_name
self.execute_query_expect_success(self.client, create_database_query)
database = catalog_hms_client.get_database(db_name)
assert database is not None
assert db_name == database.name
create_tbl_query = "create table " + db_name + "." + tbl_name + \
"(c1 int) partitioned by (part_col int) "
self.execute_query_expect_success(self.client, create_tbl_query)
insert_queries = [
"insert into {0}.{1} PARTITION (part_col=1) VALUES (1)".format(
db_name, tbl_name),
"insert into {0}.{1} PARTITION (part_col=2) VALUES (2)".format(
db_name, tbl_name),
"insert into {0}.{1} PARTITION (part_col=3) VALUES (3)".format(
db_name, tbl_name)
]
for query in insert_queries:
self.execute_query_expect_success(self.client, query)
get_table_request = self.__create_get_table_hms_request(
db_name, tbl_name, True, False, "impala")
cur_get_table_response = catalog_hms_client.get_table_req(
get_table_request)
assert cur_get_table_response.table is not None
assert cur_get_table_response.table.dbName == db_name
assert cur_get_table_response.table.tableName == tbl_name
partitions_response = catalog_hms_client.get_partition_names(
db_name, tbl_name, -1)
assert partitions_response is not None
assert len(partitions_response) == 3
# drop a partition
catalog_hms_client.drop_partition_by_name(
db_name, tbl_name, "part_col=3", True)
# save cur_get_table_response in prev_get_table_response
# before calling get_table_req HMS api
prev_get_table_response = cur_get_table_response
new_get_table_request = self.__create_get_table_hms_request(
db_name, tbl_name, True, False, "impala")
cur_get_table_response = catalog_hms_client.get_table_req(
new_get_table_request)
assert cur_get_table_response.table is not None
part_col_names = ["part_col=1", "part_col=2", "part_col=3"]
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = tbl_name
get_parts_req.names = part_col_names
get_parts_req.getFileMetadata = True
parts_response = catalog_hms_client.get_partitions_by_names_req(
get_parts_req)
if invalidateCache:
# drop_partition_by_name hms api should invalidate
# table from cache and reload new table from HMS
len(parts_response.partitions) == 2
else:
# table should be served from the cache
# and the cached table has 3 partitions
len(parts_response.partitions) == 3
# Truncate table by removing a partition
part_to_truncate = ["part_col=2"]
truncate_table_req = TruncateTableRequest()
truncate_table_req.dbName = db_name
truncate_table_req.tableName = tbl_name
truncate_table_req.partNames = part_to_truncate
catalog_hms_client.truncate_table_req(truncate_table_req)
# Check partition's file metadata after truncating it
# for table invalidated from cache, partition
# file metadata is none whereas it is not none
# for stale table present in cache
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = tbl_name
get_parts_req.names = part_to_truncate
get_parts_req.getFileMetadata = True
parts_response = catalog_hms_client.get_partitions_by_names_req(
get_parts_req)
assert len(parts_response.partitions) == 1
for part in parts_response.partitions:
assert part.fileMetadata is not None
if invalidateCache:
assert part.fileMetadata.data is None
else:
assert part.fileMetadata.data is not None
# alter current table by renaming
# the table name
table_to_update = self.__deepCopyTable(cur_get_table_response.table)
new_table_name = tbl_name + "_new"
# update table name
table_to_update.tableName = new_table_name
# alter the table
catalog_hms_client.alter_table(db_name, tbl_name, table_to_update)
prev_get_table_response = cur_get_table_response
expected_exception = None
# create get request for old table
old_table_get_request = self.__create_get_table_hms_request(
db_name, tbl_name, True, False, "impala")
if invalidateCache:
# get_table_req on old table should throw exception
# because it neither exists in cache nor in HMS
try:
cur_get_table_response = catalog_hms_client.get_table_req(
old_table_get_request)
except Exception as e:
expected_exception = e
assert expected_exception is not None
else:
# old table still exists in cache
cur_get_table_response = catalog_hms_client.get_table_req(
old_table_get_request)
assert expected_exception is None
# make sure that new table exists in cache
self.execute_query_expect_success(
self.client, "invalidate metadata " + db_name + "." + new_table_name)
# get_table_req on new table should return table
new_get_table_request = self.__create_get_table_hms_request(
db_name, new_table_name, True, False, "impala")
cur_get_table_response = catalog_hms_client.get_table_req(
new_get_table_request)
assert cur_get_table_response.table is not None
assert cur_get_table_response.table.sd is not None
table_to_update = self.__deepCopyTable(cur_get_table_response.table)
updated_cols = [["c1", "int", ""], ["c2", "int", ""]]
# alter new table by adding new column
table_to_update.sd.cols = self.__create_field_schemas(updated_cols)
catalog_hms_client.alter_table(db_name, new_table_name, table_to_update)
# get table
cur_get_table_response = catalog_hms_client.get_table_req(
new_get_table_request)
if invalidateCache:
# new get_table_req should load the table from HMS again because the table
# was invalidated from catalogD cache
self.__compare_cols(updated_cols,
cur_get_table_response.table.sd.cols, False)
else:
# new_get_table_req should load the existing table from cache
# so the expected cols should be old ones
expected_cols = [["c1", "int", ""]]
self.__compare_cols(
expected_cols, cur_get_table_response.table.sd.cols, False)
prev_get_table_response = cur_get_table_response
# drop table
catalog_hms_client.drop_table(db_name, new_table_name, True)
new_get_table_request = self.__create_get_table_hms_request(
db_name, new_table_name, True, False, "impala")
if invalidateCache:
# new get_table_req should throw an exception
# since the table does not exist in cache as well as in HMS
expected_exception = None
try:
cur_get_table_response = catalog_hms_client.get_table_req(
new_get_table_request)
except Exception as e:
expected_exception = e
assert expected_exception is not None
else:
# new get_table_req should return the
# table from the cache
cur_get_table_response = catalog_hms_client.get_table_req(
new_get_table_request)
assert cur_get_table_response.table is not None
assert prev_get_table_response.table == cur_get_table_response.table
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal "
"--start_hms_server=true "
"--hms_port=5899 "
"--fallback_to_hms_on_errors=true "
"--invalidate_hms_cache_on_ddls=true "
"--hms_event_polling_interval_s=1"
)
def test_cache_invalidate_incomplete_table(self):
"""
Tests the cache invalidation on an incomplete table
i.e a table not fully loaded in cache
"""
db_name = "test_cache_invalidate_incomplete_table_db"
tbl_name = "test_cache_invalidate_incomplete_table_tbl"
catalog_hms_client = None
try:
catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
assert catalog_hms_client is not None
create_database_query = "create database " + db_name
self.execute_query_expect_success(self.client, create_database_query)
database = catalog_hms_client.get_database(db_name)
assert database is not None
assert db_name == database.name
create_tbl_query = "create table " + db_name + "." + tbl_name + \
"(c1 int) partitioned by (part_col int) "
self.execute_query_expect_success(self.client, create_tbl_query)
insert_queries = [
"insert into {0}.{1} PARTITION (part_col=1) VALUES (1)".format(
db_name, tbl_name),
"insert into {0}.{1} PARTITION (part_col=2) VALUES (2)".format(
db_name, tbl_name)
]
for query in insert_queries:
self.execute_query_expect_success(self.client, query)
get_table_request = self.__create_get_table_hms_request(
db_name, tbl_name, True, False, "impala")
cur_get_table_response = catalog_hms_client.get_table_req(
get_table_request)
assert cur_get_table_response.table is not None
assert cur_get_table_response.table.dbName == db_name
assert cur_get_table_response.table.tableName == tbl_name
partitions_response = catalog_hms_client.get_partition_names(
db_name, tbl_name, -1)
assert partitions_response is not None
assert len(partitions_response) == 2
invalidate_tbl_query = "invalidate metadata " + \
db_name + "." + tbl_name
# now invalidate the table metadata so that we have
# Incomplete table in the cache
self.execute_query_expect_success(self.client,
invalidate_tbl_query)
# drop a partition on an Incomplete table
catalog_hms_client.drop_partition_by_name(
db_name, tbl_name, "part_col=2", True)
new_get_table_request = self.__create_get_table_hms_request(
db_name, tbl_name, True, False, "impala")
cur_get_table_response = catalog_hms_client.get_table_req(
new_get_table_request)
assert cur_get_table_response.table is not None
part_col_names = ["part_col=1", "part_col=2"]
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = tbl_name
get_parts_req.names = part_col_names
parts_response = catalog_hms_client.get_partitions_by_names_req(
get_parts_req)
assert len(parts_response.partitions) == 1
# invalidate the table again before dropping it
self.execute_query_expect_success(self.client, invalidate_tbl_query)
catalog_hms_client.drop_table(db_name, tbl_name, True)
# new get_table_req should throw an exception
# since the table does not exist in cache as well as in HMS
expected_exception = None
try:
cur_get_table_response = catalog_hms_client.get_table_req(
new_get_table_request)
except Exception as e:
expected_exception = e
assert expected_exception is not None
finally:
if catalog_hms_client is not None:
catalog_hms_client.shutdown()
if self.__get_database_no_throw(db_name) is not None:
self.hive_client.drop_database(db_name, True, True)
def __create_get_table_hms_request(self, db_name, tbl_name, get_column_stats=False,
get_file_metadata=False, engine="impala"):
new_get_table_request = GetTableRequest()
new_get_table_request.dbName = db_name
new_get_table_request.tblName = tbl_name
new_get_table_request.getColumnStats = get_column_stats
new_get_table_request.getFileMetadata = get_file_metadata
new_get_table_request.engine = engine
return new_get_table_request
def __create_test_tbls_from_hive(self, db_name):
"""Util method to create test tables from hive in the given database. It creates
4 tables (partitioned and unpartitioned) for non-acid and acid cases and returns
the valid partition names for the partitioned tables."""
# create a partitioned table
# Creating a table from hive and inserting into it takes very long. Hence we create
# a external table pointing to an existing table location
tbl_location = self.hive_client.get_table("functional", "alltypessmall").sd.location
self.run_stmt_in_hive(
"create external table {0}.{1} like functional.alltypessmall location '{2}'"
.format(db_name, TestMetastoreService.part_tbl, tbl_location))
self.run_stmt_in_hive(
"msck repair table {0}.{1}".format(db_name, TestMetastoreService.part_tbl))
# TODO create a acid partitioned table
tbl_location = self.hive_client.get_table("functional", "tinytable").sd.location
self.run_stmt_in_hive(
"create external table {0}.{1} like functional.tinytable location '{2}'"
.format(db_name, TestMetastoreService.unpart_tbl, tbl_location))
self.run_stmt_in_hive(
"create table default.{0} (c1 int)"
.format(TestMetastoreService.default_unknowntbl))
@classmethod
def __run_get_table_tests(cls, catalog_hms_client, db_name, tbl_name,
fallback_expected=False):
"""
Issues get_table_req for various cases and validates the response.
"""
# Test simple get_table_req without stats.
get_table_request = GetTableRequest()
get_table_request.dbName = db_name
get_table_request.tblName = tbl_name
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == db_name
assert get_table_response.table.tableName == tbl_name
# Request did not ask for stats, verify colStats are not populated.
assert get_table_response.table.colStats is None
# Test get_table_request with stats and engine set to Impala.
get_table_request.getColumnStats = True
get_table_request.engine = "impala"
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == db_name
assert get_table_response.table.tableName == tbl_name
assert get_table_response.table.colStats is not None
# Verify the column stats objects are populated for
# non-clustering columns.
assert len(get_table_response.table.colStats.statsObj) > 0
# We only return Impala table level column stats but engine field is
# required to be consistent with Hive's implementation.
get_table_request.engine = "hive"
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == db_name
assert get_table_response.table.tableName == tbl_name
assert get_table_response.table.colStats is not None
# Verify the column stats objects are populated for
# non-clustering columns.
assert len(get_table_response.table.colStats.statsObj) > 0
# request for tables which are unknown to catalogd should fallback to HMS
# if fallback_expected if True
get_table_request.dbName = TestMetastoreService.test_db_name
get_table_request.tblName = TestMetastoreService.unpart_tbl
get_table_request.getColumnStats = True
# Engine is not specified, this should throw an exception even if
# fallback_to_hms_on_errors is true.
expected_exception = "isGetColumnStats() is true in the request but " \
"engine is not specified."
try:
catalog_hms_client.get_table_req(get_table_request)
except Exception as e:
if expected_exception is not None:
assert expected_exception in str(e)
# Set engine and request impala, this should fallback to HMS since
# the table is not in catalog cache.
get_table_request.engine = "Impala"
if fallback_expected:
get_table_response = catalog_hms_client.get_table_req(get_table_request)
assert get_table_response.table.dbName == db_name
assert get_table_response.table.tableName == tbl_name
else:
expected_exception = None
try:
catalog_hms_client.get_table_req(get_table_request)
except Exception as e:
expected_exception = e
assert expected_exception is not None
def __run_partitions_by_names_tests(self, catalog_hms_client, db_name, tbl_name,
valid_part_names, expect_fallback=False):
"""This method runs all the test cases for fetching partitions. The method
expects that the partition keys are year and month for the given table and
that there are atleast 3 partitions in the table."""
# Test get_partition_by_name_req
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = tbl_name
# must have more than 3 partitions to test the filtering below
assert len(valid_part_names) > 3
part_names = valid_part_names[:3]
part_name_format = "year={0}/month={1}"
get_parts_req.names = part_names
response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
assert response.partitions is not None
assert len(response.partitions) == 3
self.__validate_partitions(part_names, response, part_name_format)
# request partitions with file-metadata
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = tbl_name
get_parts_req.names = part_names
get_parts_req.getFileMetadata = True
response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
assert response.partitions is not None
assert len(response.partitions) == 3
self.__validate_partitions(part_names, response, part_name_format,
True)
# request contains unavailable partitions
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = tbl_name
part_names = [x for x in valid_part_names]
part_names[0] = "year=2009/month=13"
get_parts_req.names = part_names
get_parts_req.getFileMetadata = True
assert get_parts_req.names is not None
response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
assert response.partitions is not None
assert len(response.partitions) == 3
# remove the bad partname for comparison below
part_names.pop(0)
self.__validate_partitions(part_names, response, part_name_format, True)
# request contains empty partition names
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = tbl_name
get_parts_req.names = []
response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
assert response.partitions is not None
assert len(response.partitions) == 0
# Negative test cases
# invalid partition keys
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = tbl_name
get_parts_req.names = ["invalid-key=1"]
response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
# in this case we replicate what HMS does which is to return 0 partitions
assert len(response.partitions) == 0
# empty table name
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = ""
get_parts_req.names = []
if expect_fallback:
self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
"NoSuchObjectException")
else:
self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
"Table name is empty or null")
# empty db name
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = ""
get_parts_req.tbl_name = tbl_name
get_parts_req.names = []
if expect_fallback:
self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
"NoSuchObjectException")
else:
self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
"Database name is empty or null")
# table does not exist
get_parts_req = GetPartitionsByNamesRequest()
get_parts_req.db_name = db_name
get_parts_req.tbl_name = "table-does-not-exist"
get_parts_req.names = []
if expect_fallback:
# TODO HMS actually throws an InvalidObjectException but the HMS API signature
# doesn't declare it in the signature.
self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
"Internal error")
else:
self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
"Table {0}.table-does-not-exist not found".format(
db_name))
@classmethod
def __validate_partitions(cls, part_names, get_parts_by_names_result, name_format,
expect_files=False):
"""
Validates that the given list of partitions contains all the partitions from the
given list of partition names.
"""
test_part_names = list(part_names)
if expect_files:
assert get_parts_by_names_result.dictionary is not None
# obj_dict will only be populated when the table is on HDFS
# where block locations are available.
if not IS_S3 and not IS_GCS and not IS_ADLS:
assert len(get_parts_by_names_result.dictionary.values) > 0
else:
assert get_parts_by_names_result.dictionary is None
partitions = get_parts_by_names_result.partitions
for part in partitions:
assert part is not None
# we create the part name here since partition object only has keys
assert len(part.values) == 2
name = name_format.format(part.values[0], part.values[1])
assert name in test_part_names
if expect_files:
assert part.fileMetadata is not None
assert part.fileMetadata.data is not None
else:
assert part.fileMetadata is None
test_part_names.remove(name)
assert len(test_part_names) == 0
def __assert_filemd(self, filemetadata, obj_dict):
"""
Util method which asserts that the given file-metadata is valid.
"""
assert filemetadata is not None
assert filemetadata.data is not None
assert obj_dict is not None
# obj_dict will only be populated when the table is on HDFS
# where block locations are available.
if not IS_S3 and not IS_GCS and not IS_ADLS:
assert len(obj_dict.values) > 0
def __assert_no_filemd(self, filemetadata, obj_dict):
"""
Util method which asserts that the given file-metadata not valid. Used to verify
that the HMS response objects do not contain the file-metadata.
"""
assert filemetadata is None
assert obj_dict is None
@classmethod
def __call_get_table_req_expect_exception(cls, client,
get_table_request, expected_exception_str=None):
exception_recieved = None
try:
client.get_table_req(get_table_request)
except Exception as e:
exception_recieved = e
if expected_exception_str is not None:
assert expected_exception_str in str(e)
assert exception_recieved is not None
@classmethod
def __get_parts_by_names_expect_exception(self, catalog_hms_client,
request, expected_exception_str=None):
"""
Calls get_partitions_by_names_req on the HMS client and expects
it to call with the exception msg as provided in expected_exception_str
"""
exception = None
try:
catalog_hms_client.get_partitions_by_names_req(request)
except Exception as e:
exception = e
if expected_exception_str is not None:
assert expected_exception_str in str(e)
assert exception is not None
def __compare_cols(self, cols, fieldSchemas, compareComments=True):
"""
Compares the given list of fieldSchemas with the expected cols
"""
assert len(cols) == len(fieldSchemas)
for i in range(len(cols)):
assert cols[i][0] == fieldSchemas[i].name
assert cols[i][1] == fieldSchemas[i].type
if compareComments:
assert cols[i][2] == fieldSchemas[i].comment
def __get_test_database(self, db_name,
description="test_db_for_metastore_service"):
db = Database()
db.name = db_name
db.desription = description
return db
def __get_test_tbl(self, db_name, tbl_name, cols, part_cols=None):
tbl = Table()
tbl.dbName = db_name
tbl.tableName = tbl_name
sd = StorageDescriptor()
tbl.sd = sd
sd.cols = self.__create_field_schemas(cols)
sd.inputFormat = \
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
sd.outputFormat = \
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
if part_cols is not None:
tbl.partitionKeys = self.__create_field_schemas(part_cols)
serde = SerDeInfo()
serde.name = ""
serde.serializationLib = \
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
tbl.sd.serdeInfo = serde
return tbl
def __create_field_schemas(self, cols):
fieldSchemas = []
for col in cols:
f = FieldSchema()
f.name = col[0]
f.type = col[1]
f.comment = col[2]
fieldSchemas.append(f)
return fieldSchemas
def __deepCopyTable(self, other):
tbl = Table()
tbl.tableName = other.tableName
tbl.dbName = other.dbName
tbl.owner = other.owner
tbl.createTime = other.createTime
tbl.lastAccessTime = other.lastAccessTime
tbl.retention = other.retention
tbl.sd = other.sd
tbl.partitionKeys = other.partitionKeys
tbl.parameters = other.parameters
tbl.viewOriginalText = other.viewOriginalText
tbl.viewExpandedText = other.viewExpandedText
tbl.tableType = other.tableType
tbl.privileges = other.privileges
tbl.temporary = other.temporary
tbl.rewriteEnabled = other.rewriteEnabled
tbl.creationMetadata = other.creationMetadata
tbl.catName = other.catName
tbl.ownerType = other.ownerType
tbl.writeId = other.writeId
tbl.isStatsCompliant = other.isStatsCompliant
tbl.colStats = other.colStats
tbl.accessType = other.accessType
tbl.requiredReadCapabilities = other.requiredReadCapabilities
tbl.requiredWriteCapabilities = other.requiredWriteCapabilities
tbl.id = other.id
tbl.fileMetadata = other.fileMetadata
tbl.dictionary = other.dictionary
return tbl