blob: 96c87179dbfb5c1bedc606a6c499a5939167b729 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
import pytest
from kudu.schema import INT32
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.kudu_test_suite import KuduTestSuite
from tests.common.skip import SkipIfKudu, SkipIfHive3
from tests.common.test_dimensions import add_exec_option_dimension
KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
LOG = logging.getLogger(__name__)
class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestKuduOperations, cls).add_test_dimensions()
# The default read mode of READ_LATEST does not provide high enough consistency for
# these tests.
add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args=\
"--use_local_tz_for_unix_timestamp_conversions=true")
@SkipIfKudu.no_hybrid_clock
@SkipIfKudu.hms_integration_enabled
def test_local_tz_conversion_ops(self, vector, unique_database):
"""IMPALA-5539: Test Kudu timestamp reads/writes are correct with the
use_local_tz_for_unix_timestamp_conversions flag."""
# These tests provide enough coverage of queries with timestamps.
self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database)
self.run_test_case('QueryTest/kudu_insert', vector, use_db=unique_database)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="-kudu_master_hosts=")
@SkipIfKudu.hms_integration_enabled
def test_kudu_master_hosts(self, cursor, kudu_client):
"""Check behavior when -kudu_master_hosts is not provided to catalogd."""
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
table_name = self.get_kudu_table_base_name(kudu_table.name)
props = "TBLPROPERTIES('kudu.table_name'='%s')" % (kudu_table.name)
try:
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (table_name,
props))
assert False
except Exception as e:
assert "Table property 'kudu.master_addresses' is required" in str(e)
cursor.execute("""
CREATE EXTERNAL TABLE %s STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses' = '%s',
'kudu.table_name'='%s')
""" % (table_name, KUDU_MASTER_HOSTS, kudu_table.name))
cursor.execute("DROP TABLE %s" % table_name)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="-kudu_error_buffer_size=1024")
@SkipIfKudu.hms_integration_enabled
def test_error_buffer_size(self, cursor, unique_database):
"""Check that queries fail if the size of the Kudu client errors they generate is
greater than kudu_error_buffer_size."""
table_name = "%s.test_error_buffer_size" % unique_database
cursor.execute("create table %s (a bigint primary key) stored as kudu" % table_name)
# Insert a large number of a constant value into the table to generate many "Key
# already present" errors. 50 errors should fit inside the 1024 byte limit.
cursor.execute(
"insert into %s select 1 from functional.alltypes limit 50" % table_name)
try:
# 200 errors should overflow the 1024 byte limit.
cursor.execute(
"insert into %s select 1 from functional.alltypes limit 200" % table_name)
assert False, "Expected: 'Error overflow in Kudu session.'"
except Exception as e:
assert "Error overflow in Kudu session." in str(e)
class TestKuduClientTimeout(CustomClusterTestSuite, KuduTestSuite):
"""Kudu tests that set the Kudu client operation timeout to 1ms and expect
specific timeout exceptions. While we expect all exercised operations to take at
least 1ms, it is possible that some may not and thus the test could be flaky. If
this turns out to be the case, specific tests may need to be re-considered or
removed."""
@classmethod
def get_workload(cls):
return 'functional-query'
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="-kudu_operation_timeout_ms=1")
@SkipIfKudu.hms_integration_enabled
def test_impalad_timeout(self, vector):
"""Check impalad behavior when -kudu_operation_timeout_ms is too low."""
self.run_test_case('QueryTest/kudu-timeouts-impalad', vector)
class TestKuduHMSIntegration(CustomClusterTestSuite, KuduTestSuite):
# TODO(IMPALA-8614): parameterize the common tests in query_test/test_kudu.py
# to run with HMS integration enabled. Also avoid restarting Impala to reduce
# tests time.
"""Tests the different DDL operations when using a kudu table with Kudu's integration
with the Hive Metastore."""
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def setup_class(cls):
# Restart Kudu cluster with HMS integration enabled
KUDU_ARGS = "-hive_metastore_uris=thrift://%s" % os.environ['INTERNAL_LISTEN_HOST']
cls._restart_kudu_service(KUDU_ARGS)
super(TestKuduHMSIntegration, cls).setup_class()
@classmethod
def teardown_class(cls):
# Restart Kudu cluster with HMS integration disabled
cls._restart_kudu_service("-hive_metastore_uris=")
super(TestKuduHMSIntegration, cls).teardown_class()
@pytest.mark.execute_serially
@SkipIfKudu.no_hybrid_clock
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000")
def test_create_managed_kudu_tables(self, vector, unique_database):
"""Tests the Create table operation when using a kudu table with Kudu's integration
with the Hive Metastore for managed tables. Increase timeout of individual Kudu
client rpcs to avoid requests fail due to operation delay in the Hive Metastore
for managed tables (IMPALA-8856)."""
vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database)
@pytest.mark.execute_serially
@SkipIfHive3.kudu_hms_notifications_not_supported
def test_implicit_external_table_props(self, cursor, kudu_client):
"""Check that table properties added internally for external table during
table creation are as expected.
"""
db_name = cursor.conn.db_name
with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table:
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
external_table_name = "%s_external" % impala_table_name
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
external_table_name, props))
with self.drop_impala_table_after_context(cursor, external_table_name):
cursor.execute("DESCRIBE FORMATTED %s" % external_table_name)
table_desc = [[col.strip() if col else col for col in row] for row in cursor]
# Pytest shows truncated output on failure, so print the details just in case.
LOG.info(table_desc)
assert not any("kudu.table_id" in s for s in table_desc)
assert any("Owner:" in s for s in table_desc)
assert ["", "EXTERNAL", "TRUE"] in table_desc
assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc
assert ["", "kudu.table_name", kudu_table.name] in table_desc
assert ["", "storage_handler", "org.apache.hadoop.hive.kudu.KuduStorageHandler"] \
in table_desc
@pytest.mark.execute_serially
@SkipIfHive3.kudu_hms_notifications_not_supported
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000")
def test_implicit_managed_table_props(self, cursor, kudu_client, unique_database):
"""Check that table properties added internally for managed table during table
creation are as expected. Increase timeout of individual Kudu client rpcs to
avoid requests fail due to operation delay in the Hive Metastore for managed
tables (IMPALA-8856).
"""
cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING)
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
assert kudu_client.table_exists(
KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
cursor.execute("DESCRIBE FORMATTED %s.foo" % unique_database)
table_desc = [[col.strip() if col else col for col in row] for row in cursor]
# Pytest shows truncated output on failure, so print the details just in case.
LOG.info(table_desc)
assert not any("EXTERNAL" in s for s in table_desc)
assert any("Owner:" in s for s in table_desc)
assert any("kudu.table_id" in s for s in table_desc)
assert any("kudu.master_addresses" in s for s in table_desc)
assert ["Table Type:", "MANAGED_TABLE", None] in table_desc
assert ["", "kudu.table_name", "%s.foo" % unique_database] in table_desc
assert ["", "storage_handler", "org.apache.hadoop.hive.kudu.KuduStorageHandler"] \
in table_desc
@pytest.mark.execute_serially
@SkipIfHive3.kudu_hms_notifications_not_supported
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000")
def test_drop_non_empty_db(self, unique_cursor, kudu_client):
"""Check that an attempt to drop a database will fail if Kudu tables are present
and that the tables remain. Increase timeout of individual Kudu client rpcs
to avoid requests fail due to operation delay in the Hive Metastore for managed
tables (IMPALA-8856).
"""
db_name = unique_cursor.conn.db_name
with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table:
assert kudu_client.table_exists(kudu_table.name)
unique_cursor.execute("INVALIDATE METADATA")
unique_cursor.execute("USE DEFAULT")
try:
unique_cursor.execute("DROP DATABASE %s" % db_name)
assert False
except Exception as e:
assert "One or more tables exist" in str(e)
# Dropping an empty database should succeed, once all tables
# from the database have been dropped.
assert kudu_client.table_exists(kudu_table.name)
unique_cursor.execute("DROP Table %s" % kudu_table.name)
unique_cursor.execute("DROP DATABASE %s" % db_name)
assert not kudu_client.table_exists(kudu_table.name)
@pytest.mark.execute_serially
@SkipIfHive3.kudu_hms_notifications_not_supported
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000")
def test_drop_db_cascade(self, unique_cursor, kudu_client):
"""Check that an attempt to drop a database cascade will succeed even if Kudu
tables are present. Make sure the corresponding managed tables are removed
from Kudu. Increase timeout of individual Kudu client rpcs to avoid requests
fail due to operation delay in the Hive Metastore for managed tables (IMPALA-8856).
"""
db_name = unique_cursor.conn.db_name
with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table:
assert kudu_client.table_exists(kudu_table.name)
unique_cursor.execute("INVALIDATE METADATA")
# Create a table in HDFS
hdfs_table_name = self.random_table_name()
unique_cursor.execute("""
CREATE TABLE %s (a INT) PARTITIONED BY (x INT)""" % (hdfs_table_name))
unique_cursor.execute("USE DEFAULT")
unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name)
unique_cursor.execute("SHOW DATABASES")
assert (db_name, '') not in unique_cursor.fetchall()
assert not kudu_client.table_exists(kudu_table.name)
@pytest.mark.execute_serially
@SkipIfHive3.kudu_hms_notifications_not_supported
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000")
def test_drop_managed_kudu_table(self, cursor, kudu_client, unique_database):
"""Check that dropping a managed Kudu table should fail if the underlying
Kudu table has been dropped externally. Increase timeout of individual
Kudu client rpcs to avoid requests fail due to operation delay in the
Hive Metastore for managed tables (IMPALA-8856).
"""
impala_tbl_name = "foo"
cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a)
PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name))
kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name)
assert kudu_client.table_exists(kudu_tbl_name)
kudu_client.delete_table(kudu_tbl_name)
assert not kudu_client.table_exists(kudu_tbl_name)
try:
cursor.execute("DROP TABLE %s" % kudu_tbl_name)
assert False
except Exception as e:
LOG.info(str(e))
assert "Table %s no longer exists in the Hive MetaStore." % kudu_tbl_name in str(e)
@pytest.mark.execute_serially
@SkipIfHive3.kudu_hms_notifications_not_supported
def test_drop_external_kudu_table(self, cursor, kudu_client, unique_database):
"""Check that Impala can recover from the case where the underlying Kudu table of
an external table is dropped using the Kudu client.
"""
with self.temp_kudu_table(kudu_client, [INT32], db_name=unique_database) \
as kudu_table:
# Create an external Kudu table
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
external_table_name = "%s_external" % impala_table_name
props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
external_table_name, props))
cursor.execute("DESCRIBE %s" % (external_table_name))
assert cursor.fetchall() == \
[("a", "int", "", "true", "false", "", "AUTO_ENCODING",
"DEFAULT_COMPRESSION", "0")]
# Drop the underlying Kudu table
kudu_client.delete_table(kudu_table.name)
assert not kudu_client.table_exists(kudu_table.name)
err_msg = 'the table does not exist: table_name: "%s"' % (kudu_table.name)
try:
cursor.execute("REFRESH %s" % (external_table_name))
except Exception as e:
assert err_msg in str(e)
cursor.execute("DROP TABLE %s" % (external_table_name))
cursor.execute("SHOW TABLES")
assert (external_table_name,) not in cursor.fetchall()
@SkipIfKudu.no_hybrid_clock
@SkipIfHive3.kudu_hms_notifications_not_supported
def test_kudu_alter_table(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_hms_alter', vector, use_db=unique_database)