| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # py.test configuration module |
| # |
| from impala.dbapi import connect as impala_connect |
| from kudu import connect as kudu_connect |
| from random import choice, sample |
| from string import ascii_lowercase, digits |
| from zlib import crc32 |
| import contextlib |
| import logging |
| import os |
| import pytest |
| import sys |
| |
| import tests.common |
| from impala_py_lib.helpers import find_all_files, is_core_dump |
| from tests.common.environ import build_flavor_timeout |
| from common.test_result_verifier import QueryTestResult |
| from tests.common.patterns import is_valid_impala_identifier |
| from tests.comparison.db_connection import ImpalaConnection |
| from tests.util.filesystem_utils import FILESYSTEM, ISILON_WEBHDFS_PORT, WAREHOUSE |
| |
| LOG = logging.getLogger('test_configuration') |
| LOG_FORMAT = "-- %(asctime)s %(levelname)-8s %(threadName)s: %(message)s" |
| |
| DEFAULT_CONN_TIMEOUT = 45 |
| DEFAULT_EXPLORATION_STRATEGY = 'core' |
| DEFAULT_HDFS_XML_CONF = os.path.join(os.environ['HADOOP_CONF_DIR'], "hdfs-site.xml") |
| DEFAULT_HIVE_SERVER2 = 'localhost:11050' |
| DEFAULT_IMPALAD_HS2_PORT = '21050' |
| DEFAULT_IMPALAD_HS2_HTTP_PORT = '28000' |
| DEFAULT_IMPALADS = "localhost:21000,localhost:21001,localhost:21002" |
| DEFAULT_KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1') |
| DEFAULT_KUDU_MASTER_PORT = os.getenv('KUDU_MASTER_PORT', '7051') |
| DEFAULT_METASTORE_SERVER = 'localhost:9083' |
| DEFAULT_NAMENODE_ADDR = None |
| if FILESYSTEM == 'isilon': |
| DEFAULT_NAMENODE_ADDR = "{node}:{port}".format(node=os.getenv("ISILON_NAMENODE"), |
| port=ISILON_WEBHDFS_PORT) |
| |
| # Timeout each individual test case after 2 hours, or 4 hours for slow builds |
| PYTEST_TIMEOUT_S = \ |
| build_flavor_timeout(2 * 60 * 60, slow_build_timeout=4 * 60 * 60) |
| |
| def pytest_configure(config): |
| """ Hook startup of pytest. Sets up log format and per-test timeout. """ |
| configure_logging() |
| config.option.timeout = PYTEST_TIMEOUT_S |
| |
| |
| def configure_logging(): |
| # Use a "--" since most of our tests output SQL commands, and it's nice to |
| # be able to copy-paste directly from the test output back into a shell to |
| # try to reproduce a failure. |
| logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) |
| |
| |
| def pytest_addoption(parser): |
| """Adds a new command line options to py.test""" |
| parser.addoption("--exploration_strategy", default=DEFAULT_EXPLORATION_STRATEGY, |
| help="Default exploration strategy for all tests. Valid values: core, " |
| "pairwise, exhaustive.") |
| |
| parser.addoption("--workload_exploration_strategy", default=None, |
| help="Override exploration strategy for specific workloads using the " |
| "format: workload:exploration_strategy. Ex: tpch:core,tpcds:pairwise.") |
| |
| parser.addoption("--impalad", default=DEFAULT_IMPALADS, |
| help="A comma-separated list of impalad Beeswax host:ports to target. " |
| "Note: Not all tests make use of all impalad, some tests target just " |
| "the first item in the list (it is considered the 'default'") |
| |
| parser.addoption("--impalad_hs2_port", default=DEFAULT_IMPALAD_HS2_PORT, |
| help="The impalad HiveServer2 port.") |
| |
| parser.addoption("--impalad_hs2_http_port", default=DEFAULT_IMPALAD_HS2_HTTP_PORT, |
| help="The impalad HiveServer2 HTTP port.") |
| |
| parser.addoption("--metastore_server", default=DEFAULT_METASTORE_SERVER, |
| help="The Hive Metastore server host:port to connect to.") |
| |
| parser.addoption("--hive_server2", default=DEFAULT_HIVE_SERVER2, |
| help="Hive's HiveServer2 host:port to connect to.") |
| |
| parser.addoption("--kudu_master_hosts", default=DEFAULT_KUDU_MASTER_HOSTS, |
| help="Kudu master. Can be supplied as hostname, or hostname:port.") |
| |
| parser.addoption("--minicluster_xml_conf", default=DEFAULT_HDFS_XML_CONF, |
| help="The full path to the HDFS xml configuration file") |
| |
| parser.addoption("--namenode_http_address", default=DEFAULT_NAMENODE_ADDR, |
| help="The host:port for the HDFS Namenode's WebHDFS interface. Takes" |
| " precedence over any configuration read from --minicluster_xml_conf") |
| |
| parser.addoption("--update_results", action="store_true", default=False, |
| help="If set, will generate new results for all tests run instead of " |
| "verifying the results.") |
| |
| parser.addoption("--table_formats", dest="table_formats", default=None, |
| help="Override the test vectors and run only using the specified " |
| "table formats. Ex. --table_formats=seq/snap/block,text/none") |
| |
| parser.addoption("--scale_factor", dest="scale_factor", default=None, |
| help="If running on a cluster, specify the scale factor" |
| "Ex. --scale_factor=500gb") |
| |
| # KERBEROS TODO: I highly doubt that the default is correct. Try "hive". |
| parser.addoption("--hive_service_name", dest="hive_service_name", |
| default="Hive Metastore Server", |
| help="The principal service name for the hive metastore client when " |
| "using kerberos.") |
| |
| parser.addoption("--use_kerberos", action="store_true", default=False, |
| help="use kerberos transport for running tests") |
| |
| parser.addoption("--use_local_catalog", dest="use_local_catalog", action="store_true", |
| default=False, help="Run all tests against Impala configured with " |
| "LocalCatalog.") |
| |
| parser.addoption("--sanity", action="store_true", default=False, |
| help="Runs a single test vector from each test to provide a quick " |
| "sanity check at the cost of lower test coverage.") |
| |
| parser.addoption("--skip_hbase", action="store_true", default=False, |
| help="Skip HBase tests") |
| |
| parser.addoption("--testing_remote_cluster", action="store_true", default=False, |
| help=("Indicates that tests are being run against a remote cluster. " |
| "Some tests may be marked to skip or xfail on remote clusters.")) |
| |
| parser.addoption("--shard_tests", default=None, |
| help="If set to N/M (e.g., 3/5), will split the tests into " |
| "M partitions and run the Nth partition. 1-indexed.") |
| |
| parser.addoption("--shell_executable", default=None, |
| help="Full path to the impala-shell executable. Useful for running" |
| "shell/ e2e tests against a version of the impala-shell that has been " |
| "installed as a python package in a separate virtualenv. The python " |
| "version in the target virtualenv does not need to the match the " |
| "version used to execute the tests, but if the tested environment is " |
| "using python 3, also be sure to also set the environment variable " |
| "USE_THRIFT11_GEN_PY=true in the test env. " |
| "(See $IMPALA_HOME/bin/set-pythonpath.sh.)") |
| |
| |
| def pytest_assertrepr_compare(op, left, right): |
| """ |
| Provides a hook for outputting type-specific assertion messages |
| |
| Expected to return a list or strings, where each string will be printed as a new line |
| in the result output report on assertion failure. |
| """ |
| if isinstance(left, QueryTestResult) and isinstance(right, QueryTestResult) and \ |
| op == "==": |
| result = ['Comparing QueryTestResults (expected vs actual):'] |
| for l, r in map(None, left.rows, right.rows): |
| result.append("%s == %s" % (l, r) if l == r else "%s != %s" % (l, r)) |
| if len(left.rows) != len(right.rows): |
| result.append('Number of rows returned (expected vs actual): ' |
| '%d != %d' % (len(left.rows), len(right.rows))) |
| |
| # pytest has a bug/limitation where it will truncate long custom assertion messages |
| # (it has a hardcoded string length limit of 80*8 characters). To ensure this info |
| # isn't lost, always log the assertion message. |
| LOG.error('\n'.join(result)) |
| return result |
| |
| # pytest supports printing the diff for a set equality check, but does not do |
| # so well when we're doing a subset check. This handles that situation. |
| if isinstance(left, set) and isinstance(right, set) and op == '<=': |
| # If expected is not a subset of actual, print out the set difference. |
| result = ['Items in expected results not found in actual results:'] |
| result.extend(list(left - right)) |
| result.append('Items in actual results:') |
| result.extend(list(right)) |
| LOG.error('\n'.join(result)) |
| return result |
| |
| |
| def pytest_xdist_setupnodes(config, specs): |
| """Hook that is called when setting up the xdist plugin""" |
| # Force the xdist plugin to be quiet. In verbose mode it spews useless information. |
| config.option.verbose = 0 |
| |
| |
| def pytest_generate_tests(metafunc): |
| """ |
| This is a hook to parameterize the tests based on the input vector. |
| |
| If a test has the 'vector' fixture specified, this code is invoked and it will build |
| a set of test vectors to parameterize the test with. |
| """ |
| if 'vector' in metafunc.fixturenames: |
| metafunc.cls.add_test_dimensions() |
| vectors = metafunc.cls.ImpalaTestMatrix.generate_test_vectors( |
| metafunc.config.option.exploration_strategy) |
| |
| if len(vectors) == 0: |
| LOG.warning("No test vectors generated for test '%s'. Check constraints and " |
| "input vectors" % metafunc.function.func_name) |
| |
| vector_names = map(str, vectors) |
| # In the case this is a test result update or sanity run, select a single test vector |
| # to run. This is okay for update_results because results are expected to be the same |
| # for all test vectors. |
| if metafunc.config.option.update_results or metafunc.config.option.sanity: |
| vectors = vectors[0:1] |
| vector_names = vector_names[0:1] |
| metafunc.parametrize('vector', vectors, ids=vector_names) |
| |
| |
| @pytest.yield_fixture |
| def cleanup_generated_core_dumps(request): |
| """ |
| A fixture to cleanup core dumps intentionally generated by tests (for negative testing). |
| |
| Only core dumps generated by the decorated test function will be removed. Pre-existing |
| cores that need to be triaged from prior test failures are retained. |
| """ |
| possible_cores = find_all_files('*core*') |
| pre_test_cores = set([f for f in possible_cores if is_core_dump(f)]) |
| |
| yield # Wait for test to execute |
| |
| possible_cores = find_all_files('*core*') |
| post_test_cores = set([f for f in possible_cores if is_core_dump(f)]) |
| |
| for f in (post_test_cores - pre_test_cores): |
| LOG.info("Cleaned up {core} created by {test}".format(core=f, test=request.node.name)) |
| os.remove(f) |
| |
| |
| @pytest.fixture |
| def testid_checksum(request): |
| """ |
| Return a hex string representing the CRC32 checksum of the parametrized test |
| function's full pytest test ID. The full pytest ID includes relative path, module |
| name, possible test class name, function name, and any parameters. |
| |
| This could be combined with some prefix in order to form identifiers unique to a |
| particular test case. |
| """ |
| # For an example of what a full pytest ID looks like, see below (written as Python |
| # multi-line literal) |
| # |
| # ("query_test/test_cancellation.py::TestCancellationParallel::()::test_cancel_select" |
| # "[table_format: avro/snap/block | exec_option: {'disable_codegen': False, " |
| # "'abort_on_error': 1, 'exec_single_node_rows_threshold': 0, 'batch_size': 0, " |
| # "'num_nodes': 0} | query_type: SELECT | cancel_delay: 3 | action: WAIT | " |
| # "query: select l_returnflag from lineitem]") |
| return '{0:x}'.format(crc32(request.node.nodeid) & 0xffffffff) |
| |
| |
| @pytest.fixture |
| def unique_database(request, testid_checksum): |
| """ |
| Return a database name unique to any test using the fixture. The fixture creates the |
| database during setup, allows the test to know the database name, and drops the |
| database after the test has completed. |
| |
| By default, the database name is a concatenation of the test function name and the |
| testid_checksum. The database name prefix can be changed via parameter (see below). |
| |
| A good candidate for a test to use this fixture is a test that needs to have a |
| test-local database or test-local tables that are created and destroyed at test run |
| time. Because the fixture generates a unique name, tests using this fixture can be run |
| in parallel as long as they don't need exclusion on other test-local resources. |
| |
| Sample usage: |
| |
| def test_something(self, vector, unique_database): |
| # fixture creates database test_something_48A80F |
| self.client.execute('DROP TABLE IF EXISTS `{0}`.`mytable`'.format(unique_database)) |
| # test does other stuff with the unique_database name as needed |
| |
| We also allow for parametrization: |
| |
| from tests.common.parametrize import UniqueDatabase |
| |
| @UniqueDatabase.parametrize(name_prefix='mydb', num_dbs=3, sync_ddl=True) |
| def test_something(self, vector, unique_database): |
| # fixture creates databases mydb_48A80F, mydb_48A80F2, mydb_48A80F3 with sync_ddl |
| self.client.execute('DROP TABLE IF EXISTS `{0}`.`mytable`'.format(unique_database)) |
| # test does other stuff with the unique_database name as needed |
| |
| The supported parameters: |
| |
| name_prefix: string (defaults to test function __name__) |
| - prefix to be used for the database name |
| |
| num_dbs: integer (defaults to 1) |
| - number of unique databases to create |
| - the name of the 2nd, 3rd, etc. databases are generated by appending "2", "3", |
| etc., to the first database name (which does not have a "1" suffix) |
| |
| sync_ddl: boolean (defaults to False) |
| - indicates whether the unique database should be created with sync_ddl |
| |
| For a similar DB-API 2 compliant connection/cursor that uses HS2 see the 'conn' and |
| 'unique_cursor' fixtures below. |
| """ |
| |
| # Test cases are at the function level, so no one should "accidentally" re-scope this. |
| assert 'function' == request.scope, ('This fixture must have scope "function" since ' |
| 'the fixture must guarantee unique per-test ' |
| 'databases.') |
| |
| db_name_prefix = request.function.__name__ |
| sync_ddl = False |
| num_dbs = 1 |
| fixture_params = getattr(request, 'param', None) |
| if fixture_params is not None: |
| if "name_prefix" in fixture_params: |
| db_name_prefix = fixture_params["name_prefix"] |
| if "sync_ddl" in fixture_params: |
| sync_ddl = fixture_params["sync_ddl"] |
| if "num_dbs" in fixture_params: |
| num_dbs = fixture_params["num_dbs"] |
| |
| first_db_name = '{0}_{1}'.format(db_name_prefix, testid_checksum) |
| db_names = [first_db_name] |
| for i in range(2, num_dbs + 1): |
| db_names.append(first_db_name + str(i)) |
| for db_name in db_names: |
| if not is_valid_impala_identifier(db_name): |
| raise ValueError('Unique database name "{0}" is not a valid Impala identifer; check' |
| ' test function name or any prefixes for long length or invalid ' |
| 'characters.'.format(db_name)) |
| |
| def cleanup_database(db_name, must_exist): |
| request.instance.execute_query_expect_success(request.instance.client, |
| 'DROP DATABASE {0} `{1}` CASCADE'.format( |
| "" if must_exist else "IF EXISTS", db_name), |
| {'sync_ddl': sync_ddl}) |
| # The database directory may not be removed if there are external tables in the |
| # database when it is dropped. The external locations are not removed by cascade. |
| # These preexisting files/directories can cause errors when tests run repeatedly or |
| # use a data snapshot (see IMPALA-9702), so this forces cleanup of the database |
| # directory. |
| db_location = "{0}/{1}.db".format(WAREHOUSE, db_name).lstrip('/') |
| request.instance.filesystem_client.delete_file_dir(db_location, recursive=True) |
| |
| def cleanup(): |
| # Make sure we don't try to drop the current session database |
| request.instance.execute_query_expect_success(request.instance.client, "use default") |
| for db_name in db_names: |
| cleanup_database(db_name, True) |
| LOG.info('Dropped database "{0}" for test ID "{1}"'.format( |
| db_name, str(request.node.nodeid))) |
| |
| request.addfinalizer(cleanup) |
| |
| for db_name in db_names: |
| cleanup_database(db_name, False) |
| request.instance.execute_query_expect_success( |
| request.instance.client, 'CREATE DATABASE `{0}`'.format(db_name), |
| {'sync_ddl': sync_ddl}) |
| LOG.info('Created database "{0}" for test ID "{1}"'.format(db_name, |
| str(request.node.nodeid))) |
| return first_db_name |
| |
| |
| @pytest.fixture |
| def unique_role(request, testid_checksum): |
| """Returns a unique role to any test using the fixture. The fixture does not create |
| a role.""" |
| role_name_prefix = request.function.__name__ |
| fixture_params = getattr(request, 'param', None) |
| if fixture_params is not None: |
| if 'name_prefix' in fixture_params: |
| role_name_prefix = fixture_params['name_prefix'] |
| return '{0}_{1}_role'.format(role_name_prefix, testid_checksum) |
| |
| |
| @pytest.fixture |
| def unique_name(request, testid_checksum): |
| """Returns a unique name to any test using the fixture.""" |
| name_prefix = request.function.__name__ |
| fixture_params = getattr(request, 'param', None) |
| if fixture_params is not None: |
| if 'name_prefix' in fixture_params: |
| name_prefix = fixture_params['name_prefix'] |
| return '{0}_{1}'.format(name_prefix, testid_checksum) |
| |
| |
| @pytest.yield_fixture |
| def kudu_client(): |
| """Provides a new Kudu client as a pytest fixture. The client only exists for the |
| duration of the method it is used in. |
| """ |
| kudu_master = pytest.config.option.kudu_master_hosts |
| |
| if "," in kudu_master: |
| raise Exception("Multi-master not supported yet") |
| if ":" in kudu_master: |
| host, port = kudu_master.split(":") |
| else: |
| host, port = kudu_master, DEFAULT_KUDU_MASTER_PORT |
| kudu_client = kudu_connect(host, port) |
| yield kudu_client |
| |
| try: |
| kudu_client.close() |
| except Exception as e: |
| LOG.warn("Error closing Kudu client: %s", e) |
| |
| |
| @pytest.yield_fixture(scope="class") |
| def conn(request): |
| """Provides a new DB-API compliant connection to Impala as a pytest fixture. The |
| same connection is used for all test methods in a class. The class may provide the |
| following customizations: |
| - get_db_name(): The name of the database to connect to. |
| - auto_create_db(): If declared and the method returns True, the database will |
| be created before tests run and dropped afterwards. If a database name is |
| provided by get_db_name(), it must not exist. Classes that use both |
| auto_create_db() and get_db_name() should generate a random name in |
| get_db_name() and cache it. |
| - get_conn_timeout(): The timeout, in seconds, to use for this connection. |
| The returned connection will have a 'db_name' property. |
| |
| DEPRECATED: |
| See the 'unique_database' fixture above to use Impala's custom python |
| API instead of DB-API. |
| """ |
| db_name = __call_cls_method_if_exists(request.cls, "get_db_name") |
| use_unique_conn = __call_cls_method_if_exists(request.cls, "auto_create_db") |
| timeout = __call_cls_method_if_exists(request.cls, "get_conn_timeout") or \ |
| DEFAULT_CONN_TIMEOUT |
| if use_unique_conn: |
| with __unique_conn(db_name=db_name, timeout=timeout) as conn: |
| yield conn |
| else: |
| with __auto_closed_conn(db_name=db_name, timeout=timeout) as conn: |
| yield conn |
| |
| |
| def __call_cls_method_if_exists(cls, method_name): |
| """Returns the result of calling the method 'method_name' on class 'class' if the class |
| defined such a method, otherwise returns None. |
| """ |
| method = getattr(cls, method_name, None) |
| if method: |
| return method() |
| |
| |
| @contextlib.contextmanager |
| def __unique_conn(db_name=None, timeout=DEFAULT_CONN_TIMEOUT): |
| """Connects to Impala and creates a new database, then returns a connection to it. |
| This is intended to be used in a "with" block. Upon exit, the database will be |
| dropped. A database name can be provided by 'db_name', a database by that name |
| must not exist prior to calling this method. |
| |
| with __unique_conn() as conn: |
| # Use conn |
| # The database no longer exists and the conn is closed. |
| |
| The returned connection will have a 'db_name' property. |
| |
| DEPRECATED: |
| See the 'unique_database' fixture above to use Impala's custom python |
| API instead of DB-API. |
| """ |
| if not db_name: |
| db_name = choice(ascii_lowercase) + "".join(sample(ascii_lowercase + digits, 5)) |
| with __auto_closed_conn(timeout=timeout) as conn: |
| with __auto_closed_cursor(conn) as cur: |
| cur.execute("CREATE DATABASE %s" % db_name) |
| with __auto_closed_conn(db_name=db_name, timeout=timeout) as conn: |
| try: |
| yield conn |
| finally: |
| try: |
| with __auto_closed_cursor(conn) as cur: |
| try: |
| cur.execute("USE DEFAULT") |
| cur.execute("DROP DATABASE IF EXISTS %s CASCADE" % db_name) |
| except Exception as e: |
| LOG.warn("Error dropping database: %s", e) |
| except Exception as e: |
| LOG.warn("Error creating a cursor: %s", e) |
| |
| |
| @contextlib.contextmanager |
| def __auto_closed_conn(db_name=None, timeout=DEFAULT_CONN_TIMEOUT): |
| """Returns a connection to Impala. This is intended to be used in a "with" block. |
| The connection will be closed upon exiting the block. |
| |
| The returned connection will have a 'db_name' property. |
| |
| DEPRECATED: |
| See the 'unique_database' fixture above to use Impala's custom python |
| API instead of DB-API. |
| """ |
| default_impalad = pytest.config.option.impalad.split(',')[0] |
| impalad_host = default_impalad.split(':')[0] |
| hs2_port = pytest.config.option.impalad_hs2_port |
| |
| conn = impala_connect(host=impalad_host, port=hs2_port, database=db_name, |
| timeout=timeout) |
| try: |
| conn.db_name = db_name |
| yield conn |
| finally: |
| try: |
| conn.close() |
| except Exception as e: |
| LOG.warn("Error closing Impala connection: %s", e) |
| |
| |
| @pytest.yield_fixture |
| def cursor(conn): |
| """Provides a new DB-API compliant cursor from a connection provided by the conn() |
| fixture. The cursor only exists for the duration of the method it is used in. |
| |
| The returned cursor will have a 'conn' property. The 'conn' will have a 'db_name' |
| property. |
| |
| DEPRECATED: |
| See the 'unique_database' fixture above to use Impala's custom python |
| API instead of DB-API. |
| """ |
| with __auto_closed_cursor(conn) as cur: |
| yield cur |
| |
| |
| @pytest.yield_fixture(scope="class") |
| def cls_cursor(conn): |
| """Provides a new DB-API compliant cursor from a connection provided by the conn() |
| fixture. The cursor exists for the duration of the class it is used in. |
| |
| The returned cursor will have a 'conn' property. The 'conn' will have a 'db_name' |
| property. |
| |
| DEPRECATED: |
| See the 'unique_database' fixture above to use Impala's custom python |
| API instead of DB-API. |
| """ |
| with __auto_closed_cursor(conn) as cur: |
| yield cur |
| |
| |
| @pytest.yield_fixture |
| def unique_cursor(): |
| """Provides a new DB-API compliant cursor to a newly created Impala database. The |
| cursor only exists for the duration of the method it is used in. The database will |
| be dropped after the test executes. |
| |
| The returned cursor will have a 'conn' property. The 'conn' will have a 'db_name' |
| property. |
| |
| DEPRECATED: |
| See the 'unique_database' fixture above to use Impala's custom python |
| API instead of DB-API. |
| """ |
| with __unique_conn() as conn: |
| with __auto_closed_cursor(conn) as cur: |
| yield cur |
| |
| |
| @contextlib.contextmanager |
| def __auto_closed_cursor(conn): |
| """Returns a cursor created from conn. This is intended to be used in a "with" block. |
| The cursor will be closed upon exiting the block. |
| """ |
| cursor = conn.cursor() |
| cursor.conn = conn |
| try: |
| yield cursor |
| finally: |
| try: |
| cursor.close() |
| except Exception as e: |
| LOG.warn("Error closing Impala cursor: %s", e) |
| |
| |
| @pytest.yield_fixture |
| def impala_testinfra_cursor(): |
| """ |
| Return ImpalaCursor object. Used for "tests of tests" for the infra for the query |
| generator, stress test, etc. |
| """ |
| # This differs from the cursors above, which return direct Impyla cursors. Tests that |
| # use this fixture want to interact with the objects in |
| # tests.comparison.db_connection, which need testing. |
| with ImpalaConnection() as conn: |
| cursor = conn.cursor() |
| try: |
| yield cursor |
| finally: |
| cursor.close() |
| |
| |
| @pytest.fixture(autouse=True, scope='session') |
| def validate_pytest_config(): |
| """ |
| Validate that pytest command line options make sense. |
| """ |
| if pytest.config.option.testing_remote_cluster: |
| local_prefixes = ('localhost', '127.', '0.0.0.0') |
| if any(pytest.config.option.impalad.startswith(loc) for loc in local_prefixes): |
| logging.error("--testing_remote_cluster can not be used with a local impalad") |
| pytest.exit("Invalid pytest config option: --testing_remote_cluster") |
| |
| |
| @pytest.yield_fixture(autouse=True, scope='session') |
| def cluster_properties(): |
| """Set up test cluster properties for the test session""" |
| # Don't import at top level to avoid circular dependency between conftest and |
| # tests.common.environ, which uses command-line flags set up by conftest. |
| from tests.common.environ import ImpalaTestClusterProperties |
| cluster_properties = ImpalaTestClusterProperties.get_instance() |
| yield cluster_properties |
| |
| |
| @pytest.fixture(autouse=True, scope='session') |
| def validate_python_version(): |
| """Check the Python runtime version before running any tests. Since Impala switched |
| to the toolchain Python, which is at least v2.7, the tests will not run on a version |
| below that. |
| """ |
| assert sys.version_info > (2, 7), "Tests only support Python 2.7+" |
| |
| |
| @pytest.hookimpl(trylast=True) |
| def pytest_collection_modifyitems(items, config, session): |
| """Hook to handle --shard_tests command line option. |
| |
| If set, this "deselects" a subset of tests, by hashing |
| their id into buckets. |
| """ |
| if not config.option.shard_tests: |
| return |
| |
| num_items = len(items) |
| this_shard, num_shards = map(int, config.option.shard_tests.split("/")) |
| assert 0 <= this_shard <= num_shards |
| if this_shard == num_shards: |
| this_shard = 0 |
| |
| items_selected, items_deselected = [], [] |
| for i in items: |
| if hash(i.nodeid) % num_shards == this_shard: |
| items_selected.append(i) |
| else: |
| items_deselected.append(i) |
| config.hook.pytest_deselected(items=items_deselected) |
| |
| # We must modify the items list in place for it to take effect. |
| items[:] = items_selected |
| |
| logging.info("pytest shard selection enabled %s. Of %d items, selected %d items by hash.", |
| config.option.shard_tests, num_items, len(items)) |
| |
| |
| @pytest.hookimpl(trylast=True) |
| def pytest_runtest_logstart(nodeid, location): |
| # Beeswax doesn't support commas or equals in configuration, so they are replaced. |
| # Spaces are removed to make the string a little bit shorter. |
| # The string is shortened so that it is entirely spit out by ThriftDebugString, rather |
| # than being elided. |
| tests.common.current_node = \ |
| nodeid.replace(",", ";").replace(" ", "").replace("=", "-")[0:255] |