blob: d91163d2ff8ed4aecb70c35bec210631c900e391 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
import re
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_vector import ImpalaTestDimension
# Number of times to execute each query
NUM_ITERATIONS = 5
# Each client will get a different test id.
TEST_IDS = xrange(0, 10)
# Runs many queries in parallel. The goal is to have this complete in a reasonable amount
# of time and be run as part of all test runs.
class TestMiniStress(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'targeted-stress'
@classmethod
def add_test_dimensions(cls):
super(TestMiniStress, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('test_id', *TEST_IDS))
if cls.exploration_strategy() != 'exhaustive':
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('exec_option')['batch_size'] == 0)
else:
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('exec_option')['batch_size'] != 1)
@pytest.mark.xfail(run=False, reason="IMPALA-2605: the stress tests have a history of "
"causing the end-to-end tests to hang")
@pytest.mark.stress
def test_mini_stress(self, vector):
for i in xrange(NUM_ITERATIONS):
self.run_test_case('stress', vector)
@pytest.mark.xfail(run=False, reason="IMPALA-2605: the stress tests have a history of "
"causing the end-to-end tests to hang")
@pytest.mark.stress
def test_sort_stress(self, vector):
if self.exploration_strategy() == 'core':
pytest.skip("Skipping sort stress tests for core")
if vector.get_value('table_format').file_format != 'parquet':
pytest.skip("skipping file format")
vector.get_value('exec_option')['disable_outermost_topn'] = 1
vector.get_value('exec_option')['mem_limit'] = "200m"
self.run_test_case('sort_stress', vector)
@pytest.mark.skipif(True,
reason="Skip until the race in the catalog server is resolved")
@pytest.mark.stress
def test_run_invalidate_refresh(self, vector):
"""Verifies that running concurrent invalidate table/catalog and refresh commands
don't cause failures with other running workloads and ensures catalog versions
are strictly increasing."""
target_db = self.execute_scalar('select current_database()', vector=vector)
impala_cluster = ImpalaCluster()
impalad = impala_cluster.impalads[0].service
catalogd = impala_cluster.catalogd.service
for i in xrange(NUM_ITERATIONS):
# Get the catalog versions for the table before running the workload
before_versions = dict()
before_versions['catalogd'] =\
self.get_table_version(catalogd, target_db, 'lineitem')
before_versions['impalad'] = self.get_table_version(impalad, target_db, 'lineitem')
self.run_test_case('stress-with-invalidate-refresh', vector)
# Get the catalog versions for the table after running the workload
after_versions = dict()
after_versions['catalogd'] = self.get_table_version(catalogd, target_db, 'lineitem')
after_versions['impalad'] = self.get_table_version(impalad, target_db, 'lineitem')
# Catalog versions should be strictly increasing
assert before_versions['impalad'] < after_versions['impalad']
assert before_versions['catalogd'] < after_versions['catalogd']
def get_table_version(self, impala_service, db_name, tbl_name):
"""Gets the given table's catalog version using the given impalad/catalogd service"""
obj_dump = impala_service.get_catalog_object_dump('table', db_name + '.' + tbl_name)
result = re.search(r'catalog_version \(i64\) = (\d+)', obj_dump)
assert result, 'Unable to find catalog version in object dump: ' + obj_dump
catalog_version = result.group(1)
return int(catalog_version)