blob: 1eb4ae076146758b78a37e9e47957f8de0ea5891 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import uuid
from functools import wraps
from impala.dbapi import connect as impala_connect
from tests.util.calculation_util import get_random_id
LOG = logging.getLogger(__name__)
class PerfResultDataStore(object):
'''Allows for interaction with the performance Impala database.
In order to reduce the number of calls to the database we cache query_id, workload_id,
file_type_id and run_info_id in _cache. If an id cannot be found in the cache, we make a
request to the database. Objects not in the database will be automatically inserted.
In order to avoid creating a new file per query, we create a staging table for execution
results and query profiles. When all results have been inserted, we insert the data from
the staging tables into ExecutionResults and RuntimeProfiles. This way, we create on the
order of 1 parquet file per run.'''
def __init__(self, host, port, database_name, use_secure_connection=False):
if use_secure_connection:
self._connection = impala_connect(host, port, use_ssl=True, auth_mechanism='GSSAPI')
else:
self._connection = impala_connect(host, port)
self._database_name = database_name
self._staging_exec_result_table = None
self._staging_profile_table = None
self._cache = {}
def __enter__(self):
self._staging_exec_result_table = 'ExecutionResultsStaging_' + get_random_id(5)
self._staging_profile_table = 'RuntimeProfilesStaging_' + get_random_id(5)
self._create_new_table_as('ExecutionResults', self._staging_exec_result_table)
self._create_new_table_as('RuntimeProfiles', self._staging_profile_table)
return self
def __exit__(self, type, value, traceback):
self._copy_to_table(self._staging_exec_result_table, 'ExecutionResults')
self._copy_to_table(self._staging_profile_table, 'RuntimeProfiles')
self._drop_table(self._staging_exec_result_table)
self._drop_table(self._staging_profile_table)
def insert_execution_result(self,
query_name,
query_string,
workload_name,
scale_factor,
file_format,
compression_codec,
compression_type,
num_clients,
num_iterations,
cluster_name,
executor_name,
exec_time,
run_date,
version,
run_info,
user_name,
runtime_profile):
LOG.debug('About to insert execution result.')
query_id = self._get_query_id(query_name, query_string)
workload_id = self._get_workload_id(workload_name, scale_factor)
file_type_id = self._get_file_type_id(
file_format, compression_codec, compression_type)
run_info_id = self._get_run_info_id(run_info, user_name)
result_id = self._insert_execution_result(
run_info_id, query_id, workload_id, file_type_id,
num_clients, num_iterations, cluster_name, executor_name,
exec_time, run_date, version)
self._insert_runtime_profile(result_id, runtime_profile)
def _get_query_id(self, query_name, query_string):
''' Gets the query_id for the given query name and query text. If not found in the
database, this information gets inserted into the database. Results are cached. '''
if (query_name, query_string) not in self._cache:
self._cache[(query_name, query_string)] = \
self._get_query_id_remote(query_name, query_string) or \
self._insert_query_info(query_name, query_string, '')
# Empty string above represents notes. No notes will be inserted by default.
return self._cache[(query_name, query_string)]
def _get_workload_id(self, workload_name, scale_factor):
''' Gets the workload_id for the given workload / scale factor. If not found in the
database, this information gets inserted into the database. Results are cached.'''
if (workload_name, scale_factor) not in self._cache:
self._cache[(workload_name, scale_factor)] = \
self._get_workload_id_remote(workload_name, scale_factor) or \
self._insert_workload_info(workload_name, scale_factor)
return self._cache[(workload_name, scale_factor)]
def _get_file_type_id(self, file_format, compression_codec, compression_type):
''' Gets the file_type_id for the fiven file_format / compression codec / compression
type combination. If not found in the database, this information gets inserted into
the database. Results are cached.'''
if (file_format, compression_codec, compression_type) not in self._cache:
self._cache[(file_format, compression_codec, compression_type)] = \
self._get_file_type_id_remote(
file_format, compression_codec, compression_type) or \
self._insert_filetype_info(file_format, compression_codec, compression_type)
return self._cache[(file_format, compression_codec, compression_type)]
def _get_run_info_id(self, run_info, user):
'''Gets the run_info id for the given run_info and user. Creates a new entry for each
(run_info, user) pair. Results are cached.'''
if (run_info, user) not in self._cache:
self._cache[(run_info, user)] = self._insert_run_info(run_info, user)
return self._cache[(run_info, user)]
def _cursor_wrapper(function):
''' Handles the common initialize/close pattern for cursor objects '''
@wraps(function)
def wrapper(self, *args, **kwargs):
cursor = self._connection.cursor()
cursor.execute('use {0}'.format(self._database_name))
result = function(self, *args, cursor=cursor)
cursor.close()
return result
return wrapper
@_cursor_wrapper
def _get_query_id_remote(self, query_name, query_string, cursor):
cursor.execute('''
select query_id
from Queries where name="{0}"
and query_string="{1}"'''.format(query_name, query_string))
query_id = cursor.fetchone()
return query_id[0] if query_id else None
@_cursor_wrapper
def _get_workload_id_remote(self, workload, scale_factor, cursor):
cursor.execute('''
select workload_id
from Workloads where name="{0}"
and scale_factor="{1}"'''.format(workload, scale_factor))
workload_id = cursor.fetchone()
return workload_id[0] if workload_id else None
@_cursor_wrapper
def _get_file_type_id_remote(self,
file_format,
compression_codec,
compression_type,
cursor):
cursor.execute('''
select file_type_id
from FileTypes where file_format="{0}"
and compression_codec="{1}" and compression_type="{2}"
'''.format(file_format,compression_codec, compression_type))
file_type_id = cursor.fetchone()
return file_type_id[0] if file_type_id else None
@_cursor_wrapper
def _get_run_info_id_remote(self, run_info, user, cursor):
cursor.execute('''
select run_info_id
from RunInfos where run_info="{0}" and user="{1}"'''.format(run_info, user))
run_info_id = cursor.fetchone()
return run_info_id[0] if run_info_id else None
@_cursor_wrapper
def _create_new_table_as(self, old_table, new_table, cursor):
cursor.execute('create table {0} like {1}'.format(new_table, old_table))
@_cursor_wrapper
def _copy_to_table(self, source_table, destination_table, cursor):
cursor.execute('insert into table {0} select * from {1}'.format(
destination_table, source_table))
@_cursor_wrapper
def _drop_table(self, table, cursor):
cursor.execute('drop table {0}'.format(table))
@_cursor_wrapper
def _insert_execution_result(self,
run_info_id, query_id, workload_id, file_type_id,
num_clients, num_iterations, cluster_name, executor_name,
exec_time, run_date, version, cursor):
result_id = str(uuid.uuid1())
insert_query = '''insert into {0} (result_id,
run_info_id, query_id, workload_id, file_type_id,
num_clients, num_iterations, cluster_name, executor_name,
exec_time, run_date, version)
values("{1}",
"{2}", "{3}", "{4}", "{5}",
{6}, {7}, "{8}", "{9}",
{10}, cast("{11}" as timestamp), "{12}")
'''.format(self._staging_exec_result_table, result_id,
run_info_id, query_id, workload_id, file_type_id,
num_clients, num_iterations, cluster_name, executor_name,
exec_time, run_date, version)
cursor.execute(insert_query)
return result_id
@_cursor_wrapper
def _insert_query_info(self, name, query_string, notes, cursor):
LOG.debug('Inserting Query: {0}'.format(name))
query_id = str(uuid.uuid1())
insert_statement = '''
insert into Queries (query_id, name, query_string, notes)
values("{0}", "{1}", "{2}", "{3}")'''.format(
query_id, name, query_string, notes)
cursor.execute(insert_statement)
return query_id
@_cursor_wrapper
def _insert_workload_info(self, name, scale_factor, cursor):
LOG.debug('Inserting Workload: {0}, {1}'.format(name, scale_factor))
workload_id = str(uuid.uuid1())
cursor.execute('''
insert into Workloads (workload_id, name, scale_factor)
values("{0}", "{1}", "{2}")'''.format(workload_id, name, scale_factor))
return workload_id
@_cursor_wrapper
def _insert_filetype_info(
self, file_format, compression_codec, compression_type, cursor):
LOG.debug('Inserting File Type: {0}, {1}, {2}'.format(
file_format, compression_codec, compression_type))
file_type_id = str(uuid.uuid1())
cursor.execute('''
insert into FileTypes
(file_type_id, file_format, compression_codec, compression_type)
values("{0}", "{1}", "{2}", "{3}")'''.format(
file_type_id, file_format, compression_codec, compression_type))
return file_type_id
@_cursor_wrapper
def _insert_run_info(self, run_info, user, cursor):
LOG.debug('Inserting Run Info: {0}, User: {1}'.format(run_info, user))
run_info_id = str(uuid.uuid1())
cursor.execute('''
insert into RunInfos (run_info_id, run_info, user)
values ("{0}", "{1}", "{2}")'''.format(run_info_id, run_info, user))
return run_info_id
@_cursor_wrapper
def _insert_runtime_profile(self, result_id, profile, cursor):
LOG.debug('Inserting Runtime Profile')
cursor.execute('''
insert into {0} (result_id, profile)
values ("{1}", "{2}")'''.format(self._staging_profile_table, result_id, profile))