blob: 1123999e1d4856598c95c9d7e8d0f747cd77b8ae [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from builtins import range
import logging
import multiprocessing.pool
import pytest
import re
import threading
import time
import traceback
from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfFS
from tests.common.test_vector import HS2
from tests.util.event_processor_utils import EventProcessorUtils
LOG = logging.getLogger(__name__)
LOG.setLevel(level=logging.INFO)
@SkipIfFS.hive
class TestEventProcessingPerf(CustomClusterTestSuite):
"""This class contains tests to measure the event processing time on catalogd.
Measures performance for various operations with queries executed from hive and impala
clients."""
# Below parameters are set to lower values so that tests are run faster. Need to
# increase values and run the tests manually to measure the performance.
db_count = 3
table_count = 3
partition_count = 100
insert_nonpartition_values_count = 100
insert_nonpartition_repeat_count = 2
# process_events_together flag indicates whether to process events of all operations
# together or not. Except create/drop databases and tables, remaining operations are
# processed together when set to true. If it is set to false, each type of operation
# (such as add partitions, insert into partitions, insert into table, refresh
# partitions, refresh tables, compute stats etc) are processed separately to get the
# time taken to process each type of operation.
process_events_together = False
db_prefix = "perf_db"
table_prefix = "perf_table"
stage_table = "perf_stage_table"
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestEventProcessingPerf, cls).setup_class()
def setup_method(self, method):
super(TestEventProcessingPerf, self).setup_method(method)
self.__cleanup()
def teardown_method(self, method):
self.__cleanup()
super(TestEventProcessingPerf, self).teardown_method(method)
def __cleanup(self):
self.__drop_databases()
self.client.execute("drop table if exists {}".format(self.stage_table))
self.__ensure_events_processed()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_hive_external_non_part_table(self):
self.__run_event_processing_tests("test_perf_hive_external_non_part_table", True,
False, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--min_event_processor_idle_ms=600000 "
"--enable_hierarchical_event_processing=true")
def test_perf_hive_external_non_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_hive_external_non_part_table_hierarchical", True, False, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_hive_external_part_table(self):
self.__run_event_processing_tests("test_perf_hive_external_part_table", True,
False, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--min_event_processor_idle_ms=600000 "
"--enable_hierarchical_event_processing=true")
def test_perf_hive_external_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_hive_external_part_table_hierarchical", True, False, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_hive_transact_non_part_table(self):
self.__run_event_processing_tests("test_perf_hive_transact_non_part_table", True,
True, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--min_event_processor_idle_ms=600000 "
"--enable_hierarchical_event_processing=true")
def test_perf_hive_transact_non_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_hive_transact_non_part_table_hierarchical", True, True, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_hive_transact_part_table(self):
self.__run_event_processing_tests("test_perf_hive_transact_part_table", True, True,
True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--min_event_processor_idle_ms=600000 "
"--enable_hierarchical_event_processing=true")
def test_perf_hive_transact_part_table_hierarchical(self):
self.__run_event_processing_tests("test_perf_hive_transact_part_table_hierarchical",
True, True, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_impala_external_non_part_table(self):
self.__run_event_processing_tests("test_perf_impala_external_non_part_table", False,
False, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--enable_hierarchical_event_processing=true")
def test_perf_impala_external_non_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_impala_external_non_part_table_hierarchical", False, False, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_impala_external_part_table(self):
self.__run_event_processing_tests("test_perf_impala_external_part_table", False,
False, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--enable_hierarchical_event_processing=true")
def test_perf_impala_external_part_table_hierarchical(self):
self.__run_event_processing_tests("test_perf_impala_external_part_table_hierarchical",
False, False, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_impala_transact_non_part_table(self):
self.__run_event_processing_tests("test_perf_impala_transact_non_part_table", False,
True, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--enable_hierarchical_event_processing=true")
def test_perf_impala_transact_non_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_impala_transact_non_part_table_hierarchical", False, True, False)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true")
def test_perf_impala_transact_part_table(self):
self.__run_event_processing_tests("test_perf_impala_transact_part_table", False,
True, True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
cluster_size=1,
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0.2 "
"--enable_reload_events=true "
"--enable_hierarchical_event_processing=true")
def test_perf_impala_transact_part_table_hierarchical(self):
self.__run_event_processing_tests(
"test_perf_impala_transact_part_table_hierarchical", False, True, True)
def __run_event_processing_tests(self, case, is_hive, is_transactional, is_partitioned):
"""Method to measure time taken to create databases, tables(external if
is_transactional is false otherwise transactional), partitions(if is_partitioned is
true), insert data into tables, refresh partitions and tables, compute table stats
and drop tables and databases in the end. is_hive is used to control all the queries
to be executed on hive or impala."""
LOG.info("Test: %s::%s" % (type(self).__name__, case))
refresh_table_format = "refresh {}.{}"
refresh_partition_format = refresh_table_format + " partition (j='{}')"
stats = "analyze table {}.{} compute statistics for columns"
dyn_part_cfg = "set hive.exec.dynamic.partition.mode=nonstrict;"
dyn_part_cfg += "set hive.exec.max.dynamic.partitions={};" \
.format(self.partition_count)
dyn_part_cfg += "set hive.exec.max.dynamic.partitions.pernode={};" \
.format(self.partition_count)
if not is_hive:
stats = "compute stats {}.{}"
dyn_part_cfg = ""
create_table_query = " ".join(["create", "external" if not is_transactional else '',
"table `{}`.`{}` (i int)",
" partitioned by (j string) " if is_partitioned else '',
self.__get_transactional_tblproperties(is_transactional)])
test_self = self
class ThreadLocalClient(threading.local):
def __init__(self, is_hive, hive_server2_host_port):
# called for main thread and each thread in the pool
self.is_hive = is_hive
self.hive_server2_host_port = hive_server2_host_port
self.name = threading.currentThread().name
LOG.info("Initializing for thread %s", self.name)
def __del__(self):
# Invoked only for main thread
LOG.info("Deleting for thread %s", self.name)
def __enter__(self):
# Invoked only for main thread
LOG.info("Entering for thread %s", self.name)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Invoked only for main thread
LOG.info("Exiting for thread %s", self.name)
if exc_type is not None:
traceback.print_exception(exc_type, exc_val, exc_tb)
def run_query(self, query, run_with_impala=None):
# Creating and closing impala_client for each query. There is no good way to close
# the impala_client connection for each ThreadLocalClient instance if we maintain
# at instance level. Only __init__ is called for each ThreadLocalClient instance.
# Establishing and closing connections are faster. So it should be ok to create
# and close connection for each query.
is_hive = self.is_hive
if run_with_impala:
# Overriding is_hive for some queries like describe formatted to load tables on
# impala side
is_hive = False
if is_hive:
with test_self.create_impala_client(host_port=self.hive_server2_host_port,
protocol=HS2,
is_hive=is_hive) as hive_client:
for query in query.split(';'):
hive_client.execute(query)
else:
with test_self.create_impala_client() as impala_client:
try:
handle = impala_client.execute_async(query)
is_finished = impala_client.wait_for_finished_timeout(handle, timeout=60)
assert is_finished, "Query timeout(60s): " + query
impala_client.close_query(handle)
except IMPALA_CONNECTION_EXCEPTION as e:
LOG.debug(e)
pool = multiprocessing.pool.ThreadPool(processes=8)
with ThreadLocalClient(is_hive, self.pytest_config().option.hive_server2) as tls:
dbs = []
for iter in range(self.db_count):
dbs.append(self.db_prefix + str(iter))
tables = []
for iter1 in range(self.table_count):
table_name = self.table_prefix + str(iter1)
for iter2 in range(self.db_count):
tables.append((self.db_prefix + str(iter2), table_name))
def create_database(dbname):
tls.run_query("create database {}".format(dbname))
def create_table(table_name_tuple):
tls.run_query(create_table_query.format(table_name_tuple[0], table_name_tuple[1]))
start_event_id = self.__pause_event_processing()
pool.map_async(create_database, dbs).get()
pool.map_async(create_table, tables).get()
create_db_table_time = self.__process_events_now(start_event_id)
def load_table(table_name_tuple):
tls.run_query("describe formatted {}.{}"
.format(table_name_tuple[0], table_name_tuple[1]), True)
pool.map_async(load_table, tables).get()
add_part_time = None
insert_into_part_time = None
insert_time = None
if is_partitioned:
# Stage table to create dynamic partitions
self.client.execute("create table {} (i int, j string)".format(self.stage_table))
insert_query = "insert into {} values {}"
values = ",".join([("(" + str(item) + ",'" + str(item) + "')")
for item in range(self.partition_count)])
self.client.execute(insert_query.format(self.stage_table, values))
self.__process_events_now(start_event_id)
# Create dynamic partitions
def add_or_insert_into_partitions(table_name_tuple):
tls.run_query("{} insert into {}.{} partition(j) select * from {}"
.format(dyn_part_cfg, table_name_tuple[0], table_name_tuple[1],
self.stage_table))
start_event_id = self.__pause_event_processing()
pool.map_async(add_or_insert_into_partitions, tables).get()
add_part_time = self.__process_events(start_event_id)
# Insert into existing partitions
start_event_id = self.__pause_event_processing()
pool.map_async(add_or_insert_into_partitions, tables).get()
insert_into_part_time = self.__process_events(start_event_id)
else:
repeat_insert_into_tables = []
for i in range(self.insert_nonpartition_repeat_count):
for table_name_tuple in tables:
repeat_insert_into_tables.append(table_name_tuple)
insert_query = "insert into {}.{} values {}"
values = ",".join([("(" + str(item) + ")")
for item in range(self.insert_nonpartition_values_count)])
def insert_into_table(table_name_tuple):
tls.run_query(insert_query
.format(table_name_tuple[0], table_name_tuple[1], values))
start_event_id = self.__pause_event_processing()
pool.map_async(insert_into_table, repeat_insert_into_tables).get()
insert_time = self.__process_events(start_event_id)
# Refresh
refresh_part_time = None
refresh_table_time = None
if not is_hive:
# Refresh partitions
if is_partitioned and not is_transactional:
partitions = []
for iter1 in range(self.table_count):
table_name = self.table_prefix + str(iter1)
for iter2 in range(self.db_count):
for iter3 in range(self.partition_count):
partitions.append((self.db_prefix + str(iter2), table_name, str(iter3)))
def refresh_partition(table_name_tuple):
tls.run_query(refresh_partition_format
.format(table_name_tuple[0], table_name_tuple[1],
table_name_tuple[2]))
start_event_id = self.__pause_event_processing()
pool.map_async(refresh_partition, partitions).get()
refresh_part_time = self.__process_events(start_event_id)
# Refresh tables
def refresh_table(table_name_tuple):
tls.run_query(refresh_table_format
.format(table_name_tuple[0], table_name_tuple[1]))
start_event_id = self.__pause_event_processing()
pool.map_async(refresh_table, tables).get()
refresh_table_time = self.__process_events(start_event_id)
# compute statistics
def compute_stats(table_name_tuple):
tls.run_query(stats.format(table_name_tuple[0], table_name_tuple[1]))
start_event_id = self.__pause_event_processing()
pool.map_async(compute_stats, tables).get()
compute_stats_time = self.__process_events(start_event_id)
total_time = None
if self.process_events_together:
total_time = self.__process_events_now(start_event_id)
def drop_database(dbname):
tls.run_query("drop database if exists {} cascade".format(dbname))
start_event_id = self.__pause_event_processing()
pool.map_async(drop_database, dbs).get()
drop_db_table_time = self.__process_events_now(start_event_id)
pool.terminate()
LOG.info("[Performance] Create database and table: Event count: {}, Time taken: {} s"
.format(create_db_table_time[0], create_db_table_time[1]))
if add_part_time is not None:
LOG.info("[Performance] Add partition: Event count: {}, Time taken: {} s"
.format(add_part_time[0], add_part_time[1]))
LOG.info("[Performance] Insert into partition: Event count: {}, Time taken: {} s"
.format(insert_into_part_time[0], insert_into_part_time[1]))
if insert_time is not None:
LOG.info("[Performance] Insert: Event count: {}, Time taken: {} s"
.format(insert_time[0], insert_time[1]))
if refresh_part_time is not None:
LOG.info("[Performance] Refresh partition: Event count: {}, Time taken: {} s"
.format(refresh_part_time[0], refresh_part_time[1]))
if refresh_table_time is not None:
LOG.info("[Performance] Refresh table: Event count: {}, Time taken: {} s"
.format(refresh_table_time[0], refresh_table_time[1]))
if compute_stats_time is not None:
LOG.info("[Performance] Compute statistics: Event count: {}, Time taken: {} s"
.format(compute_stats_time[0], compute_stats_time[1]))
if total_time is not None:
LOG.info("[Performance] Processed together: Event count: {}, Time taken: {} s"
.format(total_time[0], total_time[1]))
LOG.info("[Performance] Drop table and database: Event count: {}, Time taken: {} s"
.format(drop_db_table_time[0], drop_db_table_time[1]))
def __drop_databases(self):
for iter in range(self.db_count):
self.client.execute("drop database if exists {} cascade"
.format(self.db_prefix + str(iter)))
def __process_events_now(self, start_event_id):
end_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
start_time = time.time()
self.__ensure_events_processed()
return end_event_id - start_event_id, time.time() - start_time
def __process_events(self, start_event_id):
if not self.process_events_together:
return self.__process_events_now(start_event_id)
def __ensure_events_processed(self):
self.client.execute(":event_processor('start')")
EventProcessorUtils.wait_for_event_processing(self, 100)
def __pause_event_processing(self):
output = self.client.execute(":event_processor('pause')").get_data()
lastSyncedEventId = re.search(r"LastSyncedEventId:\s*(\d+)", output)
if lastSyncedEventId:
return int(lastSyncedEventId.group(1))
else:
return EventProcessorUtils.get_current_notification_id(self.hive_client)
def __get_transactional_tblproperties(self, is_transactional):
"""Get the tblproperties for transactional tables"""
return "tblproperties ('transactional'='true'," \
"'transactional_properties'='insert_only')" if is_transactional else ""