| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import pytest |
| import uuid |
| |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal |
| from tests.common.test_vector import TestDimension |
| |
| # Number of tables to create per thread |
| NUM_TBLS_PER_THREAD = 10 |
| |
| # Each client will get a different test id. |
| TEST_IDS = xrange(0, 10) |
| |
| # Simple stress test for DDL operations. Attempts to create, cache, |
| # uncache, then drop many different tables in parallel. |
| class TestDdlStress(ImpalaTestSuite): |
| @classmethod |
| def get_workload(self): |
| return 'targeted-stress' |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestDdlStress, cls).add_test_dimensions() |
| |
| if cls.exploration_strategy() != 'exhaustive': |
| pytest.skip("Should only run in exhaustive due to long execution time.") |
| |
| cls.TestMatrix.add_dimension(TestDimension('test_id', *TEST_IDS)) |
| cls.TestMatrix.add_constraint(lambda v: v.get_value('exec_option')['batch_size'] == 0) |
| |
| cls.TestMatrix.add_constraint(lambda v: |
| v.get_value('table_format').file_format == 'text' and\ |
| v.get_value('table_format').compression_codec == 'none') |
| |
| @SkipIfS3.caching |
| @SkipIfIsilon.caching |
| @SkipIfLocal.caching |
| @pytest.mark.stress |
| def test_create_cache_many_tables(self, vector): |
| self.client.set_configuration(vector.get_value('exec_option')) |
| self.client.execute("create database if not exists ddl_stress_testdb") |
| self.client.execute("use ddl_stress_testdb") |
| |
| # Since this test runs concurrently, UUIDs generated by separate processes may |
| # (arguably) not be guaranteed to be unique. To be certain, we add the test ID to the |
| # tables we create. |
| test_id = vector.vector_values[0].value |
| tbl_uniquifier = "proc_%s_%s" % (test_id, str(uuid.uuid4()).replace('-', '')) |
| for i in xrange(NUM_TBLS_PER_THREAD): |
| tbl_name = "tmp_%s_%s" % (tbl_uniquifier, i) |
| # Create a partitioned and unpartitioned table |
| self.client.execute("create table %s (i int)" % tbl_name) |
| self.client.execute("create table %s_part (i int) partitioned by (j int)" %\ |
| tbl_name) |
| # Add some data to each |
| self.client.execute("insert overwrite table %s select int_col from "\ |
| "functional.alltypestiny" % tbl_name) |
| self.client.execute("insert overwrite table %s_part partition(j) "\ |
| "values (1, 1), (2, 2), (3, 3), (4, 4), (4, 4)" % tbl_name) |
| |
| # Cache the data the unpartitioned table |
| self.client.execute("alter table %s set cached in 'testPool'" % tbl_name) |
| # Cache, uncache, then re-cache the data in the partitioned table. |
| self.client.execute("alter table %s_part set cached in 'testPool'" % tbl_name) |
| self.client.execute("alter table %s_part set uncached" % tbl_name) |
| self.client.execute("alter table %s_part set cached in 'testPool'" % tbl_name) |
| # Drop the tables, this should remove the cache requests. |
| self.client.execute("drop table %s" % tbl_name) |
| self.client.execute("drop table %s_part" % tbl_name) |