blob: 7d6790178963fef55e901c28bb1529e609a1f027 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
import json
import time
import requests
from tests.common.environ import build_flavor_timeout
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
SkipIfLocal, SkipIfHive2
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.util.hive_utils import HiveDbWrapper
from tests.util.event_processor_utils import EventProcessorUtils
@SkipIfS3.hive
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfIsilon.hive
@SkipIfLocal.hive
class TestEventProcessing(CustomClusterTestSuite):
"""This class contains tests that exercise the event processing mechanism in the
catalog."""
CATALOG_URL = "http://localhost:25020"
PROCESSING_TIMEOUT_S = 10
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--blacklisted_dbs=testBlackListedDb "
"--blacklisted_tables=functional_parquet.testBlackListedTbl",
catalogd_args="--blacklisted_dbs=testBlackListedDb "
"--blacklisted_tables=functional_parquet.testBlackListedTbl "
"--hms_event_polling_interval_s=1")
def test_events_on_blacklisted_objects(self):
"""Executes hive queries on blacklisted database and tables and makes sure that
event processor does not error out
"""
try:
event_id_before = EventProcessorUtils.get_last_synced_event_id()
self.run_stmt_in_hive("create database testBlackListedDb")
self.run_stmt_in_hive("create table testBlackListedDb.testtbl (id int)")
self.run_stmt_in_hive(
"create table functional_parquet.testBlackListedTbl (id int, val string)"
" partitioned by (part int) stored as parquet")
self.run_stmt_in_hive(
"alter table functional_parquet.testBlackListedTbl add partition (part=1)")
# wait until all the events generated above are processed
EventProcessorUtils.wait_for_event_processing(self.hive_client)
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
assert EventProcessorUtils.get_last_synced_event_id() > event_id_before
finally:
self.run_stmt_in_hive("drop database testBlackListedDb cascade")
self.run_stmt_in_hive("drop table functional_parquet.testBlackListedTbl")
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
@SkipIfHive2.acid
def test_transactional_insert_events(self):
"""Executes 'run_test_insert_events' for transactional tables.
"""
self.run_test_insert_events(is_transactional=True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
def test_insert_events(self):
"""Executes 'run_test_insert_events' for non-transactional tables.
"""
self.run_test_insert_events()
def run_test_insert_events(self, is_transactional=False):
"""Test for insert event processing. Events are created in Hive and processed in
Impala. The following cases are tested :
Insert into table --> for partitioned and non-partitioned table
Insert overwrite table --> for partitioned and non-partitioned table
Insert into partition --> for partitioned table
"""
db_name = 'test_db'
with HiveDbWrapper(self, db_name):
# Test table with no partitions.
TBL_INSERT_NOPART = 'tbl_insert_nopart'
self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, TBL_INSERT_NOPART))
last_synced_event_id = self.get_last_synced_event_id()
TBLPROPERTIES = ""
if is_transactional:
TBLPROPERTIES = "TBLPROPERTIES ('transactional'='true'," \
"'transactional_properties'='insert_only')"
self.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
% (db_name, TBL_INSERT_NOPART, TBLPROPERTIES))
# Test insert into table, this will fire an insert event.
self.run_stmt_in_hive("insert into %s.%s values(101, 200)"
% (db_name, TBL_INSERT_NOPART))
# With MetastoreEventProcessor running, the insert event will be processed. Query the
# table from Impala.
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
# Verify that the data is present in Impala.
data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
assert data.split('\t') == ['101', '200']
# Test insert overwrite. Overwrite the existing value.
last_synced_event_id = self.get_last_synced_event_id()
self.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
% (db_name, TBL_INSERT_NOPART))
# Make sure the event has been processed.
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
# Verify that the data is present in Impala.
data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
assert data.split('\t') == ['101', '201']
# Test partitioned table.
last_synced_event_id = self.get_last_synced_event_id()
TBL_INSERT_PART = 'tbl_insert_part'
self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, TBL_INSERT_PART))
self.run_stmt_in_hive("create table %s.%s (id int, name string) "
"partitioned by(day int, month int, year int) %s"
% (db_name, TBL_INSERT_PART, TBLPROPERTIES))
# Insert data into partitions.
self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
"values(101, 'x')" % (db_name, TBL_INSERT_PART))
# Make sure the event has been processed.
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
# Verify that the data is present in Impala.
data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_PART))
assert data.split('\t') == ['101', 'x', '28', '3', '2019']
# Test inserting into existing partitions.
last_synced_event_id = self.get_last_synced_event_id()
self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
"values(102, 'y')" % (db_name, TBL_INSERT_PART))
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
# Verify that the data is present in Impala.
data = self.execute_scalar("select count(*) from %s.%s where day=28 and month=3 "
"and year=2019" % (db_name, TBL_INSERT_PART))
assert data.split('\t') == ['2']
# Test insert overwrite into existing partitions
last_synced_event_id = self.get_last_synced_event_id()
self.run_stmt_in_hive("insert overwrite table %s.%s partition(day=28, month=03, "
"year=2019)" "values(101, 'z')" % (db_name, TBL_INSERT_PART))
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
# Verify that the data is present in Impala.
data = self.execute_scalar("select * from %s.%s where day=28 and month=3 and"
" year=2019 and id=101" % (db_name, TBL_INSERT_PART))
assert data.split('\t') == ['101', 'z', '28', '3', '2019']
@CustomClusterTestSuite.with_args(
catalogd_args="--hms_event_polling_interval_s=1"
)
@SkipIfHive2.acid
def test_empty_partition_events_transactional(self, unique_database):
self._run_test_empty_partition_events(unique_database, True)
@CustomClusterTestSuite.with_args(
catalogd_args="--hms_event_polling_interval_s=1"
)
def test_empty_partition_events(self, unique_database):
self._run_test_empty_partition_events(unique_database, False)
def _run_test_empty_partition_events(self, unique_database, is_transactional):
TBLPROPERTIES = ""
if is_transactional:
TBLPROPERTIES = "TBLPROPERTIES ('transactional'='true'," \
"'transactional_properties'='insert_only')"
test_tbl = unique_database + ".test_events"
self.run_stmt_in_hive("create table {0} (key string, value string) \
partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES))
EventProcessorUtils.wait_for_event_processing(self.hive_client)
self.client.execute("describe {0}".format(test_tbl))
self.run_stmt_in_hive(
"alter table {0} add partition (year=2019)".format(test_tbl))
EventProcessorUtils.wait_for_event_processing(self.hive_client)
assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
self.run_stmt_in_hive(
"alter table {0} add if not exists partition (year=2019)".format(test_tbl))
EventProcessorUtils.wait_for_event_processing(self.hive_client)
assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.run_stmt_in_hive(
"alter table {0} drop partition (year=2019)".format(test_tbl))
EventProcessorUtils.wait_for_event_processing(self.hive_client)
assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
self.run_stmt_in_hive(
"alter table {0} drop if exists partition (year=2019)".format(test_tbl))
EventProcessorUtils.wait_for_event_processing(self.hive_client)
assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
def wait_for_insert_event_processing(self, previous_event_id):
"""Waits until the event processor has finished processing insert events. Since two
events are created for every insert done through hive, we wait until the event id is
incremented by at least two. Returns true if at least two events were processed within
self.PROCESSING_TIMEOUT_S, False otherwise.
"""
new_event_id = self.get_last_synced_event_id()
success = False
end_time = time.time() + self.PROCESSING_TIMEOUT_S
while time.time() < end_time:
new_event_id = self.get_last_synced_event_id()
if new_event_id - previous_event_id >= 2:
success = True
break
time.sleep(0.1)
# Wait for catalog update to be propagated.
time.sleep(build_flavor_timeout(2, slow_build_timeout=4))
return success
def get_event_processor_metrics(self):
"""Scrapes the catalog's /events webpage and return a dictionary with the event
processor metrics."""
response = requests.get("%s/events?json" % self.CATALOG_URL)
assert response.status_code == requests.codes.ok
varz_json = json.loads(response.text)
metrics = varz_json["event_processor_metrics"].strip().splitlines()
# Helper to strip a pair of elements
def strip_pair(p):
return (p[0].strip(), p[1].strip())
pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
return dict(pairs)
def get_last_synced_event_id(self):
"""Returns the last_synced_event_id."""
metrics = self.get_event_processor_metrics()
assert 'last-synced-event-id' in metrics.keys()
return int(metrics['last-synced-event-id'])