| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import pytest |
| |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal |
| |
| # Number of tables to create per thread |
| NUM_TBLS_PER_THREAD = 10 |
| |
| # Each client will get a different test id. |
| TEST_INDICES = xrange(10) |
| |
| |
| # Simple stress test for DDL operations. Attempts to create, cache, |
| # uncache, then drop many different tables in parallel. |
| class TestDdlStress(ImpalaTestSuite): |
| SHARED_DATABASE = 'test_ddl_stress_db' |
| |
| @classmethod |
| def get_workload(self): |
| return 'targeted-stress' |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestDdlStress, cls).add_test_dimensions() |
| |
| if cls.exploration_strategy() != 'exhaustive': |
| pytest.skip("Should only run in exhaustive due to long execution time.") |
| |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: (v.get_value('table_format').file_format == 'text' and |
| v.get_value('table_format').compression_codec == 'none')) |
| |
| @SkipIfS3.caching |
| @SkipIfADLS.caching |
| @SkipIfIsilon.caching |
| @SkipIfLocal.caching |
| @pytest.mark.stress |
| @pytest.mark.parametrize('test_index', TEST_INDICES) |
| def test_create_cache_many_tables(self, vector, testid_checksum, test_index): |
| self.client.set_configuration(vector.get_value('exec_option')) |
| # Don't use unique_database so that we issue several "create database" statements |
| # rather simultaneously on the same object. |
| self.client.execute("create database if not exists {0}".format(self.SHARED_DATABASE)) |
| |
| for i in xrange(NUM_TBLS_PER_THREAD): |
| tbl_name = "{db}.test_{checksum}_{i}".format( |
| db=self.SHARED_DATABASE, |
| checksum=testid_checksum, |
| i=i) |
| # Because we're using a shared database, first clear potential tables from any |
| # previous test runs that failed to properly clean up |
| self.client.execute("drop table if exists {0}".format(tbl_name)) |
| self.client.execute("drop table if exists {0}_part".format(tbl_name)) |
| # Create a partitioned and unpartitioned table |
| self.client.execute("create table %s (i int)" % tbl_name) |
| self.client.execute( |
| "create table %s_part (i int) partitioned by (j int)" % tbl_name) |
| # Add some data to each |
| self.client.execute( |
| "insert overwrite table %s select int_col from " |
| "functional.alltypestiny" % tbl_name) |
| self.client.execute( |
| "insert overwrite table %s_part partition(j) " |
| "values (1, 1), (2, 2), (3, 3), (4, 4), (4, 4)" % tbl_name) |
| |
| # Cache the data the unpartitioned table |
| self.client.execute("alter table %s set cached in 'testPool'" % tbl_name) |
| # Cache, uncache, then re-cache the data in the partitioned table. |
| self.client.execute("alter table %s_part set cached in 'testPool'" % tbl_name) |
| self.client.execute("alter table %s_part set uncached" % tbl_name) |
| self.client.execute("alter table %s_part set cached in 'testPool'" % tbl_name) |
| # Drop the tables, this should remove the cache requests. |
| self.client.execute("drop table %s" % tbl_name) |
| self.client.execute("drop table %s_part" % tbl_name) |