blob: c63bf3f71a6e65259436ea4f54170b1b4ae43609 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from subprocess import check_call
import pytest
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2
from tests.metadata.test_event_processing_base import TestEventProcessingBase
from tests.util.event_processor_utils import EventProcessorUtils
@SkipIfFS.hive
@SkipIfCatalogV2.hms_event_polling_disabled()
class TestEventProcessing(ImpalaTestSuite):
"""This class contains tests that exercise the event processing mechanism in the
catalog."""
CATALOG_URL = "http://localhost:25020"
PROCESSING_TIMEOUT_S = 10
@SkipIfHive2.acid
def test_transactional_insert_events(self, unique_database):
"""Executes 'run_test_insert_events' for transactional tables.
"""
TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, self.client,
ImpalaCluster.get_e2e_test_cluster(), unique_database, is_transactional=True)
def test_insert_events(self, unique_database):
"""Executes 'run_test_insert_events' for non-transactional tables.
"""
TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, self.client,
ImpalaCluster.get_e2e_test_cluster(), unique_database)
def test_iceberg_inserts(self):
"""IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event generation
This test doesn't test event processing. It tests that Iceberg INSERTs still work
when HMS event polling is enabled.
IMPALA-10736 tracks adding proper support for Hive Replication."""
db_name = ImpalaTestSuite.get_random_name("iceberg_insert_event_db_")
tbl_name = "ice_test"
try:
self.execute_query("create database if not exists {0}".format(db_name))
self.execute_query("""
create table {0}.{1} (i int)
partitioned by spec (bucket(5, i))
stored as iceberg;""".format(db_name, tbl_name))
self.execute_query("insert into {0}.{1} values (1)".format(db_name, tbl_name))
data = self.execute_scalar("select * from {0}.{1}".format(db_name, tbl_name))
assert data == '1'
finally:
self.execute_query("drop database if exists {0} cascade".format(db_name))
@SkipIfHive2.acid
def test_empty_partition_events_transactional(self, unique_database):
self._run_test_empty_partition_events(unique_database, True)
def test_empty_partition_events(self, unique_database):
self._run_test_empty_partition_events(unique_database, False)
def test_event_based_replication(self):
TestEventProcessingBase._run_event_based_replication_tests_impl(self.hive_client,
self.client, ImpalaCluster.get_e2e_test_cluster(), self.filesystem_client)
def _run_test_empty_partition_events(self, unique_database, is_transactional):
test_tbl = unique_database + ".test_events"
TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties(
is_transactional)
self.run_stmt_in_hive("create table {0} (key string, value string) \
partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES))
EventProcessorUtils.wait_for_event_processing(self)
self.client.execute("describe {0}".format(test_tbl))
self.run_stmt_in_hive(
"alter table {0} add partition (year=2019)".format(test_tbl))
EventProcessorUtils.wait_for_event_processing(self)
assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
self.run_stmt_in_hive(
"alter table {0} add if not exists partition (year=2019)".format(test_tbl))
EventProcessorUtils.wait_for_event_processing(self)
assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.run_stmt_in_hive(
"alter table {0} drop partition (year=2019)".format(test_tbl))
EventProcessorUtils.wait_for_event_processing(self)
assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.run_stmt_in_hive(
"alter table {0} drop if exists partition (year=2019)".format(test_tbl))
EventProcessorUtils.wait_for_event_processing(self)
assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
@pytest.mark.execute_serially
def test_load_data_from_impala(self, unique_database):
tbl_nopart = "tbl_nopart"
tbl_part = "tbl_part"
staging_dir = "/tmp/{0}".format(unique_database)
check_call(["hdfs", "dfs", "-mkdir", staging_dir])
try:
self.execute_query(
"drop table if exists {0}.{1} purge".format(unique_database, tbl_nopart))
self.execute_query(
"drop table if exists {0}.{1} purge".format(unique_database, tbl_part))
self.execute_query(
"create table {0}.{1} like functional_parquet.tinytable stored as parquet"
.format(unique_database, tbl_nopart))
self.execute_query(
"create table {0}.{1} like functional_parquet.alltypessmall stored as \
parquet".format(unique_database, tbl_part))
EventProcessorUtils.wait_for_event_processing(self)
check_call([
"hdfs", "dfs", "-cp", "/test-warehouse/tinytable_parquet", staging_dir])
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
self.execute_query("load data inpath '{0}/tinytable_parquet' \
into table {1}.{2}".format(staging_dir, unique_database, tbl_nopart))
# Check if there is an insert event fired after load data statement.
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
assert len(events) == 1
last_event = events[0]
assert last_event.dbName == unique_database
assert last_event.tableName == tbl_nopart
assert last_event.eventType == "INSERT"
check_call(["hdfs", "dfs", "-cp", "/test-warehouse/alltypessmall_parquet",
staging_dir])
self.execute_query(
"alter table {0}.{1} add partition (year=2009,month=1)".format(
unique_database, tbl_part))
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
self.execute_query(
"load data inpath '{0}/alltypessmall_parquet/year=2009/month=1' \
into table {1}.{2} partition (year=2009,month=1)".format(
staging_dir, unique_database, tbl_part))
# Check if there is an insert event fired after load data statement.
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
assert len(events) == 1
last_event = events[0]
assert last_event.dbName == unique_database
assert last_event.tableName == tbl_part
assert last_event.eventType == "INSERT"
finally:
check_call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", staging_dir])
def test_transact_partition_location_change_from_hive(self, unique_database):
"""IMPALA-12356: Verify alter partition from hive on transactional table"""
self.run_test_partition_location_change_from_hive(unique_database,
"transact_alter_part_hive", True)
def test_partition_location_change_from_hive(self, unique_database):
"""IMPALA-12356: Verify alter partition from hive on non-transactional table"""
self.run_test_partition_location_change_from_hive(unique_database, "alter_part_hive")
def run_test_partition_location_change_from_hive(self, unique_database, tbl_name,
is_transactional=False):
fq_tbl_name = unique_database + "." + tbl_name
TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties(
is_transactional)
# Create the table
self.client.execute(
"create table %s (i int) partitioned by(j int) stored as parquet %s"
% (fq_tbl_name, TBLPROPERTIES))
# Insert some data to a partition
p1 = "j=1"
self.client.execute("insert into table %s partition(%s) values (0),(1),(2)"
% (fq_tbl_name, p1))
tbl_location = self._get_table_property("Location:", fq_tbl_name)
partitions = self.get_impala_partition_info(fq_tbl_name, 'Location')
assert [("{0}/{1}".format(tbl_location, p1),)] == partitions
# Alter partition location from hive
new_part_location = tbl_location + "/j=2"
self.run_stmt_in_hive("alter table %s partition(%s) set location '%s'"
% (fq_tbl_name, p1, new_part_location))
EventProcessorUtils.wait_for_event_processing(self)
# Verify if the location is updated
partitions = self.get_impala_partition_info(fq_tbl_name, 'Location')
assert [(new_part_location,)] == partitions
def _get_table_property(self, property_name, table_name):
"""Extract the table property value from output of DESCRIBE FORMATTED."""
result = self.client.execute("describe formatted {0}".format(table_name))
for row in result.data:
if property_name in row:
row = row.split('\t')
if row[1] == 'NULL':
break
return row[1].rstrip()
return None