| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # The base class that should be used for almost all Impala tests |
| |
| import grp |
| import logging |
| import os |
| import pprint |
| import pwd |
| import pytest |
| import re |
| import shutil |
| import subprocess |
| import tempfile |
| import time |
| from functools import wraps |
| from getpass import getuser |
| from random import choice |
| from subprocess import check_call |
| |
| from tests.common.base_test_suite import BaseTestSuite |
| from tests.common.impala_connection import create_connection |
| from tests.common.impala_service import ImpaladService |
| from tests.common.test_dimensions import ( |
| ALL_BATCH_SIZES, |
| ALL_CLUSTER_SIZES, |
| ALL_DISABLE_CODEGEN_OPTIONS, |
| ALL_NODES_ONLY, |
| TableFormatInfo, |
| create_exec_option_dimension, |
| get_dataset_from_workload, |
| load_table_info_dimension) |
| from tests.common.test_result_verifier import ( |
| apply_error_match_filter, |
| try_compile_regex, |
| verify_raw_results, |
| verify_runtime_profile) |
| from tests.common.test_vector import ImpalaTestDimension |
| from tests.performance.query import Query |
| from tests.performance.query_exec_functions import execute_using_jdbc |
| from tests.performance.query_executor import JdbcQueryExecConfig |
| from tests.util.filesystem_utils import ( |
| IS_S3, |
| IS_ADLS, |
| S3_BUCKET_NAME, |
| ADLS_STORE_NAME, |
| FILESYSTEM_PREFIX) |
| |
| from tests.util.hdfs_util import ( |
| HdfsConfig, |
| get_hdfs_client, |
| get_hdfs_client_from_conf, |
| NAMENODE) |
| from tests.util.s3_util import S3Client |
| from tests.util.test_file_parser import ( |
| QueryTestSectionReader, |
| parse_query_test_file, |
| write_test_file) |
| from tests.util.thrift_util import create_transport |
| |
| # Imports required for Hive Metastore Client |
| from hive_metastore import ThriftHiveMetastore |
| from thrift.protocol import TBinaryProtocol |
| |
| # Initializing the logger before conditional imports, since we will need it |
| # for them. |
| logging.basicConfig(level=logging.INFO, format='-- %(message)s') |
| LOG = logging.getLogger('impala_test_suite') |
| |
| # The ADLS python client isn't downloaded when ADLS isn't the target FS, so do a |
| # conditional import. |
| if IS_ADLS: |
| try: |
| from tests.util.adls_util import ADLSClient |
| except ImportError: |
| LOG.error("Need the ADLSClient for testing with ADLS") |
| |
| IMPALAD_HOST_PORT_LIST = pytest.config.option.impalad.split(',') |
| assert len(IMPALAD_HOST_PORT_LIST) > 0, 'Must specify at least 1 impalad to target' |
| IMPALAD = IMPALAD_HOST_PORT_LIST[0] |
| IMPALAD_HS2_HOST_PORT =\ |
| IMPALAD.split(':')[0] + ":" + pytest.config.option.impalad_hs2_port |
| HIVE_HS2_HOST_PORT = pytest.config.option.hive_server2 |
| WORKLOAD_DIR = os.environ['IMPALA_WORKLOAD_DIR'] |
| HDFS_CONF = HdfsConfig(pytest.config.option.minicluster_xml_conf) |
| TARGET_FILESYSTEM = os.getenv("TARGET_FILESYSTEM") or "hdfs" |
| IMPALA_HOME = os.getenv("IMPALA_HOME") |
| EE_TEST_LOGS_DIR = os.getenv("IMPALA_EE_TEST_LOGS_DIR") |
| # Match any SET statement. Assume that query options' names |
| # only contain alphabets, underscores and digits after position 1. |
| # The statement may include SQL line comments starting with --, which we need to |
| # strip out. The test file parser already strips out comments starting with #. |
| COMMENT_LINES_REGEX = r'(?:\s*--.*\n)*' |
| SET_PATTERN = re.compile( |
| COMMENT_LINES_REGEX + r'\s*set\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*=*', re.I) |
| |
| # Base class for Impala tests. All impala test cases should inherit from this class |
| class ImpalaTestSuite(BaseTestSuite): |
| @classmethod |
| def add_test_dimensions(cls): |
| """ |
| A hook for adding additional dimensions. |
| |
| By default load the table_info and exec_option dimensions, but if a test wants to |
| add more dimensions or different dimensions they can override this function. |
| """ |
| super(ImpalaTestSuite, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_dimension( |
| cls.create_table_info_dimension(cls.exploration_strategy())) |
| cls.ImpalaTestMatrix.add_dimension(cls.__create_exec_option_dimension()) |
| |
| @classmethod |
| def setup_class(cls): |
| """Setup section that runs before each test suite""" |
| cls.hive_client, cls.client = [None, None] |
| # Create a Hive Metastore Client (used for executing some test SETUP steps |
| metastore_host, metastore_port = pytest.config.option.metastore_server.split(':') |
| trans_type = 'buffered' |
| if pytest.config.option.use_kerberos: |
| trans_type = 'kerberos' |
| cls.hive_transport = create_transport( |
| host=metastore_host, |
| port=metastore_port, |
| service=pytest.config.option.hive_service_name, |
| transport_type=trans_type) |
| protocol = TBinaryProtocol.TBinaryProtocol(cls.hive_transport) |
| cls.hive_client = ThriftHiveMetastore.Client(protocol) |
| cls.hive_transport.open() |
| |
| # Create a connection to Impala. |
| cls.client = cls.create_impala_client(IMPALAD) |
| |
| # Default query options are populated on demand. |
| cls.default_query_options = {} |
| |
| cls.impalad_test_service = cls.create_impala_service() |
| cls.hdfs_client = cls.create_hdfs_client() |
| cls.filesystem_client = cls.hdfs_client |
| if IS_S3: |
| cls.filesystem_client = S3Client(S3_BUCKET_NAME) |
| elif IS_ADLS: |
| cls.filesystem_client = ADLSClient(ADLS_STORE_NAME) |
| |
| @classmethod |
| def teardown_class(cls): |
| """Setup section that runs after each test suite""" |
| # Cleanup the Impala and Hive Metastore client connections |
| if cls.hive_transport: |
| cls.hive_transport.close() |
| |
| if cls.client: |
| cls.client.close() |
| |
| @classmethod |
| def create_impala_client(cls, host_port=IMPALAD): |
| client = create_connection(host_port=host_port, |
| use_kerberos=pytest.config.option.use_kerberos) |
| client.connect() |
| return client |
| |
| @classmethod |
| def create_impala_service(cls, host_port=IMPALAD, webserver_port=25000): |
| host, port = host_port.split(':') |
| return ImpaladService(host, beeswax_port=port, webserver_port=webserver_port) |
| |
| @classmethod |
| def create_hdfs_client(cls): |
| if pytest.config.option.namenode_http_address is None: |
| hdfs_client = get_hdfs_client_from_conf(HDFS_CONF) |
| else: |
| host, port = pytest.config.option.namenode_http_address.split(":") |
| hdfs_client = get_hdfs_client(host, port) |
| return hdfs_client |
| |
| @classmethod |
| def all_db_names(cls): |
| results = cls.client.execute("show databases").data |
| # Extract first column - database name |
| return [row.split("\t")[0] for row in results] |
| |
| @classmethod |
| def cleanup_db(cls, db_name, sync_ddl=1): |
| cls.client.execute("use default") |
| cls.client.set_configuration({'sync_ddl': sync_ddl}) |
| cls.client.execute("drop database if exists `" + db_name + "` cascade") |
| |
| def __restore_query_options(self, query_options_changed, impalad_client): |
| """ |
| Restore the list of modified query options to their default values. |
| """ |
| # Populate the default query option if it's empty. |
| if not self.default_query_options: |
| try: |
| query_options = impalad_client.get_default_configuration() |
| for query_option in query_options: |
| self.default_query_options[query_option.key.upper()] = query_option.value |
| except Exception as e: |
| LOG.info('Unexpected exception when getting default query options: ' + str(e)) |
| return |
| # Restore all the changed query options. |
| for query_option in query_options_changed: |
| query_option = query_option.upper() |
| if not query_option in self.default_query_options: |
| continue |
| default_val = self.default_query_options[query_option] |
| query_str = 'SET ' + query_option + '="' + default_val + '"' |
| try: |
| impalad_client.execute(query_str) |
| except Exception as e: |
| LOG.info('Unexpected exception when executing ' + query_str + ' : ' + str(e)) |
| |
| def get_impala_partition_info(self, table_name, *include_fields): |
| """ |
| Find information about partitions of a table, as returned by a SHOW PARTITION |
| statement. Return a list that contains one tuple for each partition. |
| |
| If 'include_fields' is not specified, the tuples will contain all the fields returned |
| by SHOW PARTITION. Otherwise, return only those fields whose names are listed in |
| 'include_fields'. Field names are compared case-insensitively. |
| """ |
| exec_result = self.client.execute('show partitions %s' % table_name) |
| fieldSchemas = exec_result.schema.fieldSchemas |
| fields_dict = {} |
| for idx, fs in enumerate(fieldSchemas): |
| fields_dict[fs.name.lower()] = idx |
| |
| rows = exec_result.get_data().split('\n') |
| rows.pop() |
| fields_idx = [] |
| for fn in include_fields: |
| fn = fn.lower() |
| assert fn in fields_dict, 'Invalid field: %s' % fn |
| fields_idx.append(fields_dict[fn]) |
| |
| result = [] |
| for row in rows: |
| fields = row.split('\t') |
| if not fields_idx: |
| result_fields = fields |
| else: |
| result_fields = [] |
| for i in fields_idx: |
| result_fields.append(fields[i]) |
| result.append(tuple(result_fields)) |
| return result |
| |
| def __verify_exceptions(self, expected_strs, actual_str, use_db): |
| """ |
| Verifies that at least one of the strings in 'expected_str' is either: |
| * A row_regex: line that matches the actual exception string 'actual_str' |
| * A substring of the actual exception string 'actual_str'. |
| """ |
| actual_str = actual_str.replace('\n', '') |
| for expected_str in expected_strs: |
| # In error messages, some paths are always qualified and some are not. |
| # So, allow both $NAMENODE and $FILESYSTEM_PREFIX to be used in CATCH. |
| expected_str = expected_str.strip() \ |
| .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) \ |
| .replace('$NAMENODE', NAMENODE) \ |
| .replace('$IMPALA_HOME', IMPALA_HOME) |
| if use_db: expected_str = expected_str.replace('$DATABASE', use_db) |
| # Strip newlines so we can split error message into multiple lines |
| expected_str = expected_str.replace('\n', '') |
| expected_regex = try_compile_regex(expected_str) |
| if expected_regex: |
| if expected_regex.match(actual_str): return |
| else: |
| # Not a regex - check if expected substring is present in actual. |
| if expected_str in actual_str: return |
| assert False, 'Unexpected exception string. Expected: %s\nNot found in actual: %s' % \ |
| (expected_str, actual_str) |
| |
| def __verify_results_and_errors(self, vector, test_section, result, use_db): |
| """Verifies that both results and error sections are as expected. Rewrites both |
| by replacing $NAMENODE, $DATABASE and $IMPALA_HOME with their actual values, and |
| optionally rewriting filenames with __HDFS_FILENAME__, to ensure that expected and |
| actual values are easily compared. |
| """ |
| replace_filenames_with_placeholder = True |
| for section_name in ('RESULTS', 'ERRORS'): |
| if section_name in test_section: |
| if "$NAMENODE" in test_section[section_name]: |
| replace_filenames_with_placeholder = False |
| test_section[section_name] = test_section[section_name] \ |
| .replace('$NAMENODE', NAMENODE) \ |
| .replace('$IMPALA_HOME', IMPALA_HOME) \ |
| .replace('$USER', getuser()) |
| if use_db: |
| test_section[section_name] = test_section[section_name].replace('$DATABASE', use_db) |
| verify_raw_results(test_section, result, vector.get_value('table_format').file_format, |
| pytest.config.option.update_results, |
| replace_filenames_with_placeholder) |
| |
| |
| def run_test_case(self, test_file_name, vector, use_db=None, multiple_impalad=False, |
| encoding=None, test_file_vars=None): |
| """ |
| Runs the queries in the specified test based on the vector values |
| |
| Runs the query using targeting the file format/compression specified in the test |
| vector and the exec options specified in the test vector. If multiple_impalad=True |
| a connection to a random impalad will be chosen to execute each test section. |
| Otherwise, the default impalad client will be used. |
| Additionally, the encoding for all test data can be specified using the 'encoding' |
| parameter. This is useful when data is ingested in a different encoding (ex. |
| latin). If not set, the default system encoding will be used. |
| If a dict 'test_file_vars' is provided, then all keys will be replaced with their |
| values in queries before they are executed. Callers need to avoid using reserved key |
| names, see 'reserved_keywords' below. |
| """ |
| table_format_info = vector.get_value('table_format') |
| exec_options = vector.get_value('exec_option') |
| |
| # Resolve the current user's primary group name. |
| group_id = pwd.getpwnam(getuser()).pw_gid |
| group_name = grp.getgrgid(group_id).gr_name |
| |
| target_impalad_clients = list() |
| if multiple_impalad: |
| target_impalad_clients =\ |
| map(ImpalaTestSuite.create_impala_client, IMPALAD_HOST_PORT_LIST) |
| else: |
| target_impalad_clients = [self.client] |
| |
| # Change the database to reflect the file_format, compression codec etc, or the |
| # user specified database for all targeted impalad. |
| for impalad_client in target_impalad_clients: |
| ImpalaTestSuite.change_database(impalad_client, |
| table_format_info, use_db, pytest.config.option.scale_factor) |
| impalad_client.set_configuration(exec_options) |
| |
| sections = self.load_query_test_file(self.get_workload(), test_file_name, |
| encoding=encoding) |
| for test_section in sections: |
| if 'SHELL' in test_section: |
| assert len(test_section) == 1, \ |
| "SHELL test sections can't contain other sections" |
| cmd = test_section['SHELL']\ |
| .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)\ |
| .replace('$IMPALA_HOME', IMPALA_HOME) |
| if use_db: cmd = cmd.replace('$DATABASE', use_db) |
| LOG.info("Shell command: " + cmd) |
| check_call(cmd, shell=True) |
| continue |
| |
| if 'QUERY' not in test_section: |
| assert 0, 'Error in test file %s. Test cases require a -- QUERY section.\n%s' %\ |
| (test_file_name, pprint.pformat(test_section)) |
| |
| if 'SETUP' in test_section: |
| self.execute_test_case_setup(test_section['SETUP'], table_format_info) |
| |
| # TODO: support running query tests against different scale factors |
| query = QueryTestSectionReader.build_query(test_section['QUERY'] |
| .replace('$GROUP_NAME', group_name) |
| .replace('$IMPALA_HOME', IMPALA_HOME) |
| .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) |
| .replace('$SECONDARY_FILESYSTEM', os.getenv("SECONDARY_FILESYSTEM") or str())) |
| if use_db: query = query.replace('$DATABASE', use_db) |
| |
| reserved_keywords = ["$DATABASE", "$FILESYSTEM_PREFIX", "$GROUP_NAME", |
| "$IMPALA_HOME", "$NAMENODE", "$QUERY", "$SECONDARY_FILESYSTEM"] |
| |
| if test_file_vars: |
| for key, value in test_file_vars.iteritems(): |
| if key in reserved_keywords: |
| raise RuntimeError("Key {0} is reserved".format(key)) |
| query = query.replace(key, value) |
| |
| if 'QUERY_NAME' in test_section: |
| LOG.info('Query Name: \n%s\n' % test_section['QUERY_NAME']) |
| |
| # Support running multiple queries within the same test section, only verifying the |
| # result of the final query. The main use case is to allow for 'USE database' |
| # statements before a query executes, but it is not limited to that. |
| # TODO: consider supporting result verification of all queries in the future |
| result = None |
| target_impalad_client = choice(target_impalad_clients) |
| query_options_changed = [] |
| try: |
| user = None |
| if 'USER' in test_section: |
| # Create a new client so the session will use the new username. |
| user = test_section['USER'].strip() |
| target_impalad_client = self.create_impala_client() |
| for query in query.split(';'): |
| set_pattern_match = SET_PATTERN.match(query) |
| if set_pattern_match != None: |
| query_options_changed.append(set_pattern_match.groups()[0]) |
| result = self.__execute_query(target_impalad_client, query, user=user) |
| except Exception as e: |
| if 'CATCH' in test_section: |
| self.__verify_exceptions(test_section['CATCH'], str(e), use_db) |
| continue |
| raise |
| finally: |
| if len(query_options_changed) > 0: |
| self.__restore_query_options(query_options_changed, target_impalad_client) |
| |
| if 'CATCH' in test_section and '__NO_ERROR__' not in test_section['CATCH']: |
| expected_str = " or ".join(test_section['CATCH']).strip() \ |
| .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) \ |
| .replace('$NAMENODE', NAMENODE) \ |
| .replace('$IMPALA_HOME', IMPALA_HOME) |
| assert False, "Expected exception: %s" % expected_str |
| |
| assert result is not None |
| assert result.success |
| |
| # Decode the results read back if the data is stored with a specific encoding. |
| if encoding: result.data = [row.decode(encoding) for row in result.data] |
| # Replace $NAMENODE in the expected results with the actual namenode URI. |
| if 'RESULTS' in test_section: |
| # Combining 'RESULTS' with 'DML_RESULTS" is currently unsupported because |
| # __verify_results_and_errors calls verify_raw_results which always checks |
| # ERRORS, TYPES, LABELS, etc. which doesn't make sense if there are two |
| # different result sets to consider (IMPALA-4471). |
| assert 'DML_RESULTS' not in test_section |
| self.__verify_results_and_errors(vector, test_section, result, use_db) |
| else: |
| # TODO: Can't validate errors without expected results for now. |
| assert 'ERRORS' not in test_section,\ |
| "'ERRORS' sections must have accompanying 'RESULTS' sections" |
| # If --update_results, then replace references to the namenode URI with $NAMENODE. |
| if pytest.config.option.update_results and 'RESULTS' in test_section: |
| test_section['RESULTS'] = test_section['RESULTS'] \ |
| .replace(NAMENODE, '$NAMENODE') \ |
| .replace('$IMPALA_HOME', IMPALA_HOME) |
| if 'RUNTIME_PROFILE_%s' % table_format_info.file_format in test_section: |
| # If this table format has a RUNTIME_PROFILE section specifically for it, evaluate |
| # that section and ignore any general RUNTIME_PROFILE sections. |
| verify_runtime_profile( |
| test_section['RUNTIME_PROFILE_%s' % table_format_info.file_format], |
| result.runtime_profile) |
| elif 'RUNTIME_PROFILE' in test_section: |
| verify_runtime_profile(test_section['RUNTIME_PROFILE'], result.runtime_profile) |
| |
| if 'DML_RESULTS' in test_section: |
| assert 'ERRORS' not in test_section |
| # The limit is specified to ensure the queries aren't unbounded. We shouldn't have |
| # test files that are checking the contents of tables larger than that anyways. |
| dml_results_query = "select * from %s limit 1000" % \ |
| test_section['DML_RESULTS_TABLE'] |
| dml_result = self.__execute_query(target_impalad_client, dml_results_query) |
| verify_raw_results(test_section, dml_result, |
| vector.get_value('table_format').file_format, |
| pytest.config.option.update_results, result_section='DML_RESULTS') |
| if pytest.config.option.update_results: |
| output_file = os.path.join(EE_TEST_LOGS_DIR, |
| test_file_name.replace('/','_') + ".test") |
| write_test_file(output_file, sections, encoding=encoding) |
| |
| def execute_test_case_setup(self, setup_section, table_format): |
| """ |
| Executes a test case 'SETUP' section |
| |
| The test case 'SETUP' section is mainly used for insert tests. These tests need to |
| have some actions performed before each test case to ensure the target tables are |
| empty. The current supported setup actions: |
| RESET <table name> - Drop and recreate the table |
| DROP PARTITIONS <table name> - Drop all partitions from the table |
| """ |
| setup_section = QueryTestSectionReader.build_query(setup_section) |
| for row in setup_section.split('\n'): |
| row = row.lstrip() |
| if row.startswith('RESET'): |
| db_name, table_name = QueryTestSectionReader.get_table_name_components(\ |
| table_format, row.split('RESET')[1]) |
| self.__reset_table(db_name, table_name) |
| self.client.execute("invalidate metadata " + db_name + "." + table_name) |
| elif row.startswith('DROP PARTITIONS'): |
| db_name, table_name = QueryTestSectionReader.get_table_name_components(\ |
| table_format, row.split('DROP PARTITIONS')[1]) |
| self.__drop_partitions(db_name, table_name) |
| self.client.execute("invalidate metadata " + db_name + "." + table_name) |
| else: |
| assert False, 'Unsupported setup command: %s' % row |
| |
| @classmethod |
| def change_database(cls, impala_client, table_format=None, |
| db_name=None, scale_factor=None): |
| if db_name == None: |
| assert table_format != None |
| db_name = QueryTestSectionReader.get_db_name(table_format, |
| scale_factor if scale_factor else '') |
| query = 'use %s' % db_name |
| # Clear the exec_options before executing a USE statement. |
| # The USE statement should not fail for negative exec_option tests. |
| impala_client.clear_configuration() |
| impala_client.execute(query) |
| |
| def execute_wrapper(function): |
| """ |
| Issues a use database query before executing queries. |
| |
| Database names are dependent on the input format for table, which the table names |
| remaining the same. A use database is issued before query execution. As such, |
| database names need to be build pre execution, this method wraps around the different |
| execute methods and provides a common interface to issue the proper use command. |
| """ |
| @wraps(function) |
| def wrapper(*args, **kwargs): |
| table_format = None |
| if kwargs.get('table_format'): |
| table_format = kwargs.get('table_format') |
| del kwargs['table_format'] |
| if kwargs.get('vector'): |
| table_format = kwargs.get('vector').get_value('table_format') |
| del kwargs['vector'] |
| # self is the implicit first argument |
| if table_format is not None: |
| args[0].change_database(args[0].client, table_format) |
| return function(*args, **kwargs) |
| return wrapper |
| |
| @classmethod |
| @execute_wrapper |
| def execute_query_expect_success(cls, impalad_client, query, query_options=None): |
| """Executes a query and asserts if the query fails""" |
| result = cls.__execute_query(impalad_client, query, query_options) |
| assert result.success |
| return result |
| |
| @execute_wrapper |
| def execute_query_expect_failure(self, impalad_client, query, query_options=None): |
| """Executes a query and asserts if the query succeeds""" |
| result = None |
| try: |
| result = self.__execute_query(impalad_client, query, query_options) |
| except Exception, e: |
| return e |
| |
| assert not result.success, "No failure encountered for query %s" % query |
| return result |
| |
| @execute_wrapper |
| def execute_query(self, query, query_options=None): |
| return self.__execute_query(self.client, query, query_options) |
| |
| def execute_query_using_client(self, client, query, vector): |
| self.change_database(client, vector.get_value('table_format')) |
| return client.execute(query) |
| |
| def execute_query_async_using_client(self, client, query, vector): |
| self.change_database(client, vector.get_value('table_format')) |
| return client.execute_async(query) |
| |
| def close_query_using_client(self, client, query): |
| return client.close_query(query) |
| |
| @execute_wrapper |
| def execute_query_async(self, query, query_options=None): |
| self.client.set_configuration(query_options) |
| return self.client.execute_async(query) |
| |
| @execute_wrapper |
| def close_query(self, query): |
| return self.client.close_query(query) |
| |
| @execute_wrapper |
| def execute_scalar(self, query, query_options=None): |
| result = self.__execute_query(self.client, query, query_options) |
| assert len(result.data) <= 1, 'Multiple values returned from scalar' |
| return result.data[0] if len(result.data) == 1 else None |
| |
| def exec_and_compare_hive_and_impala_hs2(self, stmt, compare = lambda x, y: x == y): |
| """Compare Hive and Impala results when executing the same statment over HS2""" |
| # execute_using_jdbc expects a Query object. Convert the query string into a Query |
| # object |
| query = Query() |
| query.query_str = stmt |
| # Run the statement targeting Hive |
| exec_opts = JdbcQueryExecConfig(impalad=HIVE_HS2_HOST_PORT, transport='SASL') |
| hive_results = execute_using_jdbc(query, exec_opts).data |
| |
| # Run the statement targeting Impala |
| exec_opts = JdbcQueryExecConfig(impalad=IMPALAD_HS2_HOST_PORT, transport='NOSASL') |
| impala_results = execute_using_jdbc(query, exec_opts).data |
| |
| # Compare the results |
| assert (impala_results is not None) and (hive_results is not None) |
| assert compare(impala_results, hive_results) |
| |
| def load_query_test_file(self, workload, file_name, valid_section_names=None, |
| encoding=None): |
| """ |
| Loads/Reads the specified query test file. Accepts the given section names as valid. |
| Uses a default list of valid section names if valid_section_names is None. |
| """ |
| test_file_path = os.path.join(WORKLOAD_DIR, workload, 'queries', file_name + '.test') |
| if not os.path.isfile(test_file_path): |
| assert False, 'Test file not found: %s' % file_name |
| return parse_query_test_file(test_file_path, valid_section_names, encoding=encoding) |
| |
| def __drop_partitions(self, db_name, table_name): |
| """Drops all partitions in the given table""" |
| for partition in self.hive_client.get_partition_names(db_name, table_name, 0): |
| assert self.hive_client.drop_partition_by_name(db_name, table_name, \ |
| partition, True), 'Could not drop partition: %s' % partition |
| |
| @classmethod |
| def __execute_query(cls, impalad_client, query, query_options=None, user=None): |
| """Executes the given query against the specified Impalad""" |
| if query_options is not None: impalad_client.set_configuration(query_options) |
| return impalad_client.execute(query, user=user) |
| |
| def __execute_query_new_client(self, query, query_options=None, |
| use_kerberos=False): |
| """Executes the given query against the specified Impalad""" |
| new_client = self.create_impala_client() |
| new_client.set_configuration(query_options) |
| return new_client.execute(query) |
| |
| def __reset_table(self, db_name, table_name): |
| """Resets a table (drops and recreates the table)""" |
| table = self.hive_client.get_table(db_name, table_name) |
| assert table is not None |
| self.hive_client.drop_table(db_name, table_name, True) |
| self.hive_client.create_table(table) |
| |
| def clone_table(self, src_tbl, dst_tbl, recover_partitions, vector): |
| src_loc = self._get_table_location(src_tbl, vector) |
| self.client.execute("create external table {0} like {1} location '{2}'"\ |
| .format(dst_tbl, src_tbl, src_loc)) |
| if recover_partitions: |
| self.client.execute("alter table {0} recover partitions".format(dst_tbl)) |
| |
| def appx_equals(self, a, b, diff_perc): |
| """Returns True if 'a' and 'b' are within 'diff_perc' percent of each other, |
| False otherwise. 'diff_perc' must be a float in [0,1].""" |
| if a == b: return True # Avoid division by 0 |
| assert abs(a - b) / float(max(a,b)) <= diff_perc |
| |
| def _get_table_location(self, table_name, vector): |
| """ Returns the HDFS location of the table """ |
| result = self.execute_query_using_client(self.client, |
| "describe formatted %s" % table_name, vector) |
| for row in result.data: |
| if 'Location:' in row: |
| return row.split('\t')[1] |
| # This should never happen. |
| assert 0, 'Unable to get location for table: ' + table_name |
| |
| def run_stmt_in_hive(self, stmt, username=getuser()): |
| """ |
| Run a statement in Hive, returning stdout if successful and throwing |
| RuntimeError(stderr) if not. |
| """ |
| # When HiveServer2 is configured to use "local" mode (i.e., MR jobs are run |
| # in-process rather than on YARN), Hadoop's LocalDistributedCacheManager has a |
| # race, wherein it tires to localize jars into |
| # /tmp/hadoop-$USER/mapred/local/<millis>. Two simultaneous Hive queries |
| # against HS2 can conflict here. Weirdly LocalJobRunner handles a similar issue |
| # (with the staging directory) by appending a random number. To overcome this, |
| # in the case that HS2 is on the local machine (which we conflate with also |
| # running MR jobs locally), we move the temporary directory into a unique |
| # directory via configuration. This workaround can be removed when |
| # https://issues.apache.org/jira/browse/MAPREDUCE-6441 is resolved. |
| # A similar workaround is used in bin/load-data.py. |
| tmpdir = None |
| beeline_opts = [] |
| if pytest.config.option.hive_server2.startswith("localhost:"): |
| tmpdir = tempfile.mkdtemp(prefix="impala-tests-") |
| beeline_opts += ['--hiveconf', 'mapreduce.cluster.local.dir={0}'.format(tmpdir)] |
| try: |
| # Remove HADOOP_CLASSPATH from environment. Beeline doesn't need it, |
| # and doing so avoids Hadoop 3's classpath de-duplication code from |
| # placing $HADOOP_CONF_DIR too late in the classpath to get the right |
| # log4j configuration file picked up. Some log4j configuration files |
| # in Hadoop's jars send logging to stdout, confusing Impala's test |
| # framework. |
| env = os.environ.copy() |
| env.pop("HADOOP_CLASSPATH", None) |
| call = subprocess.Popen( |
| ['beeline', |
| '--outputformat=csv2', |
| '-u', 'jdbc:hive2://' + pytest.config.option.hive_server2, |
| '-n', username, |
| '-e', stmt] + beeline_opts, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| # Beeline in Hive 2.1 will read from stdin even when "-e" |
| # is specified; explicitly make sure there's nothing to |
| # read to avoid hanging, especially when running interactively |
| # with py.test. |
| stdin=file("/dev/null"), |
| env=env) |
| (stdout, stderr) = call.communicate() |
| call.wait() |
| if call.returncode != 0: |
| raise RuntimeError(stderr) |
| return stdout |
| finally: |
| if tmpdir is not None: shutil.rmtree(tmpdir) |
| |
| def hive_partition_names(self, table_name): |
| """Find the names of the partitions of a table, as Hive sees them. |
| |
| The return format is a list of strings. Each string represents a partition |
| value of a given column in a format like 'column1=7/column2=8'. |
| """ |
| return self.run_stmt_in_hive( |
| 'show partitions %s' % table_name).split('\n')[1:-1] |
| |
| @classmethod |
| def create_table_info_dimension(cls, exploration_strategy): |
| # If the user has specified a specific set of table formats to run against, then |
| # use those. Otherwise, load from the workload test vectors. |
| if pytest.config.option.table_formats: |
| table_formats = list() |
| for tf in pytest.config.option.table_formats.split(','): |
| dataset = get_dataset_from_workload(cls.get_workload()) |
| table_formats.append(TableFormatInfo.create_from_string(dataset, tf)) |
| tf_dimensions = ImpalaTestDimension('table_format', *table_formats) |
| else: |
| tf_dimensions = load_table_info_dimension(cls.get_workload(), exploration_strategy) |
| # If 'skip_hbase' is specified or the filesystem is isilon, s3 or local, we don't |
| # need the hbase dimension. |
| if pytest.config.option.skip_hbase or TARGET_FILESYSTEM.lower() \ |
| in ['s3', 'isilon', 'local', 'adls']: |
| for tf_dimension in tf_dimensions: |
| if tf_dimension.value.file_format == "hbase": |
| tf_dimensions.remove(tf_dimension) |
| break |
| return tf_dimensions |
| |
| @classmethod |
| def __create_exec_option_dimension(cls): |
| cluster_sizes = ALL_CLUSTER_SIZES |
| disable_codegen_options = ALL_DISABLE_CODEGEN_OPTIONS |
| batch_sizes = ALL_BATCH_SIZES |
| exec_single_node_option = [0] |
| if cls.exploration_strategy() == 'core': |
| disable_codegen_options = [False] |
| cluster_sizes = ALL_NODES_ONLY |
| return create_exec_option_dimension(cluster_sizes, disable_codegen_options, |
| batch_sizes, |
| exec_single_node_option=exec_single_node_option, |
| disable_codegen_rows_threshold_options=[0]) |
| |
| @classmethod |
| def exploration_strategy(cls): |
| default_strategy = pytest.config.option.exploration_strategy |
| if pytest.config.option.workload_exploration_strategy: |
| workload_strategies = pytest.config.option.workload_exploration_strategy.split(',') |
| for workload_strategy in workload_strategies: |
| workload_strategy = workload_strategy.split(':') |
| if len(workload_strategy) != 2: |
| raise ValueError, 'Invalid workload:strategy format: %s' % workload_strategy |
| if cls.get_workload() == workload_strategy[0]: |
| return workload_strategy[1] |
| return default_strategy |