blob: ab2c627fa2f4a42bc807a666fbf8ae15b2c0f4d5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Superclass for all tests that need a custom cluster.
# TODO: Configure cluster size and other parameters.
import logging
import os
import os.path
import pipes
import pytest
import subprocess
from subprocess import check_call
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.impala_cluster import ImpalaCluster
from tests.util.filesystem_utils import IS_LOCAL
from time import sleep
IMPALA_HOME = os.environ['IMPALA_HOME']
DEFAULT_CLUSTER_SIZE = 3
NUM_COORDINATORS = DEFAULT_CLUSTER_SIZE
# Additional args passed to respective daemon command line.
IMPALAD_ARGS = 'impalad_args'
STATESTORED_ARGS = 'state_store_args'
CATALOGD_ARGS = 'catalogd_args'
KUDU_ARGS = 'kudu_args'
# Additional args passed to the start-impala-cluster script.
START_ARGS = 'start_args'
SENTRY_CONFIG = 'sentry_config'
SENTRY_LOG_DIR = 'sentry_log_dir'
HIVE_CONF_DIR = 'hive_conf_dir'
CLUSTER_SIZE = "cluster_size"
# Default query options passed to the impala daemon command line. Handled separately from
# other impala daemon arguments to allow merging multiple defaults into a single list.
DEFAULT_QUERY_OPTIONS = 'default_query_options'
IMPALA_LOG_DIR = 'impala_log_dir'
NUM_EXCLUSIVE_COORDINATORS = 'num_exclusive_coordinators'
STATESTORED_TIMEOUT_S = 'statestored_timeout_s'
IMPALAD_TIMEOUT_S = 'impalad_timeout_s'
# Run with fast topic updates by default to reduce time to first query running.
DEFAULT_STATESTORE_ARGS = '--statestore_update_frequency_ms=50 \
--statestore_priority_update_frequency_ms=50 \
--statestore_heartbeat_frequency_ms=50'
class CustomClusterTestSuite(ImpalaTestSuite):
"""Every test in a test suite deriving from this class gets its own Impala cluster.
Custom arguments may be passed to the cluster by using the @with_args decorator."""
@classmethod
def get_workload(cls):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(CustomClusterTestSuite, cls).add_test_dimensions()
cls.add_custom_cluster_constraints()
@classmethod
def add_custom_cluster_constraints(cls):
# Defines constraints for custom cluster tests, called by add_test_dimensions.
# By default, custom cluster tests only run on text/none and with a limited set of
# exec options. Subclasses may override this to relax these default constraints.
super(CustomClusterTestSuite, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'text' and
v.get_value('table_format').compression_codec == 'none')
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('exec_option')['batch_size'] == 0 and
v.get_value('exec_option')['disable_codegen'] == False and
v.get_value('exec_option')['num_nodes'] == 0)
@classmethod
def setup_class(cls):
# Explicit override of ImpalaTestSuite.setup_class(). For custom cluster, the
# ImpalaTestSuite.setup_class() procedure needs to happen on a per-method basis.
# IMPALA-3614: @SkipIfLocal.multiple_impalad workaround
# IMPALA-2943 TODO: When pytest is upgraded, see if this explicit skip can be
# removed in favor of the class-level SkipifLocal.multiple_impalad decorator.
if IS_LOCAL:
pytest.skip("multiple impalads needed")
@classmethod
def teardown_class(cls):
# Explicit override of ImpalaTestSuite.teardown_class(). For custom cluster, the
# ImpalaTestSuite.teardown_class() procedure needs to happen on a per-method basis.
pass
@staticmethod
def with_args(impalad_args=None, statestored_args=None, catalogd_args=None,
start_args=None, sentry_config=None, default_query_options=None,
impala_log_dir=None, sentry_log_dir=None, hive_conf_dir=None, cluster_size=None,
num_exclusive_coordinators=None, kudu_args=None, statestored_timeout_s=None,
impalad_timeout_s=None):
"""Records arguments to be passed to a cluster by adding them to the decorated
method's func_dict"""
def decorate(func):
if impalad_args is not None:
func.func_dict[IMPALAD_ARGS] = impalad_args
func.func_dict[STATESTORED_ARGS] = statestored_args
if catalogd_args is not None:
func.func_dict[CATALOGD_ARGS] = catalogd_args
if start_args is not None:
func.func_dict[START_ARGS] = start_args.split()
if sentry_config is not None:
func.func_dict[SENTRY_CONFIG] = sentry_config
if sentry_log_dir is not None:
func.func_dict[SENTRY_LOG_DIR] = sentry_log_dir
if hive_conf_dir is not None:
func.func_dict[HIVE_CONF_DIR] = hive_conf_dir
if kudu_args is not None:
func.func_dict[KUDU_ARGS] = kudu_args
if default_query_options is not None:
func.func_dict[DEFAULT_QUERY_OPTIONS] = default_query_options
if impala_log_dir is not None:
func.func_dict[IMPALA_LOG_DIR] = impala_log_dir
if cluster_size is not None:
func.func_dict[CLUSTER_SIZE] = cluster_size
if num_exclusive_coordinators is not None:
func.func_dict[NUM_EXCLUSIVE_COORDINATORS] = num_exclusive_coordinators
if statestored_timeout_s is not None:
func.func_dict[STATESTORED_TIMEOUT_S] = statestored_timeout_s
if impalad_timeout_s is not None:
func.func_dict[IMPALAD_TIMEOUT_S] = impalad_timeout_s
return func
return decorate
def setup_method(self, method):
cluster_args = list()
for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS]:
if arg in method.func_dict:
cluster_args.append("--%s=%s " % (arg, method.func_dict[arg]))
if START_ARGS in method.func_dict:
cluster_args.extend(method.func_dict[START_ARGS])
if HIVE_CONF_DIR in method.func_dict:
self._start_hive_service(method.func_dict[HIVE_CONF_DIR])
# Should let Impala adopt the same hive-site.xml. The only way is to add it in the
# beginning of the CLASSPATH. Because there's already a hive-site.xml in the
# default CLASSPATH (see bin/set-classpath.sh).
cluster_args.append(
'--env_vars=CUSTOM_CLASSPATH=%s ' % method.func_dict[HIVE_CONF_DIR])
if KUDU_ARGS in method.func_dict:
self._restart_kudu_service(method.func_dict[KUDU_ARGS])
if SENTRY_CONFIG in method.func_dict:
self._start_sentry_service(method.func_dict[SENTRY_CONFIG],
method.func_dict.get(SENTRY_LOG_DIR))
cluster_size = DEFAULT_CLUSTER_SIZE
if CLUSTER_SIZE in method.func_dict:
cluster_size = method.func_dict[CLUSTER_SIZE]
use_exclusive_coordinators = False
num_coordinators = cluster_size
if NUM_EXCLUSIVE_COORDINATORS in method.func_dict:
num_coordinators = method.func_dict[NUM_EXCLUSIVE_COORDINATORS]
use_exclusive_coordinators = True
# Start a clean new cluster before each test
kwargs = {
"cluster_size": cluster_size,
"num_coordinators": num_coordinators,
"expected_num_executors": cluster_size,
"default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS),
"use_exclusive_coordinators": use_exclusive_coordinators
}
if IMPALA_LOG_DIR in method.func_dict:
kwargs["impala_log_dir"] = method.func_dict[IMPALA_LOG_DIR]
if STATESTORED_TIMEOUT_S in method.func_dict:
kwargs["statestored_timeout_s"] = method.func_dict[STATESTORED_TIMEOUT_S]
if IMPALAD_TIMEOUT_S in method.func_dict:
kwargs["impalad_timeout_s"] = method.func_dict[IMPALAD_TIMEOUT_S]
self._start_impala_cluster(cluster_args, **kwargs)
super(CustomClusterTestSuite, self).setup_class()
def teardown_method(self, method):
if HIVE_CONF_DIR in method.func_dict:
self._start_hive_service(None) # Restart Hive Service using default configs
super(CustomClusterTestSuite, self).teardown_class()
@classmethod
def _stop_impala_cluster(cls):
# TODO: Figure out a better way to handle case where processes are just starting
# / cleaning up so that sleeps are not needed.
sleep(2)
check_call([os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'), '--kill_only'])
sleep(2)
@classmethod
def _restart_kudu_service(cls, kudu_args=None):
kudu_env = dict(os.environ)
if kudu_args is not None:
kudu_env["IMPALA_KUDU_STARTUP_FLAGS"] = kudu_args
call = subprocess.Popen(
['/bin/bash', '-c', os.path.join(IMPALA_HOME,
'testdata/cluster/admin restart kudu')],
env=kudu_env)
call.wait()
if call.returncode != 0:
raise RuntimeError("Unable to restart Kudu")
@classmethod
def _start_sentry_service(cls, sentry_service_config, sentry_log_dir=None):
sentry_env = dict(os.environ)
if sentry_log_dir is not None:
sentry_env['SENTRY_LOG_DIR'] = sentry_log_dir
sentry_env['SENTRY_SERVICE_CONFIG'] = sentry_service_config
call = subprocess.Popen(
['/bin/bash', '-c', os.path.join(IMPALA_HOME,
'testdata/bin/run-sentry-service.sh')],
env=sentry_env)
call.wait()
if call.returncode != 0:
raise RuntimeError("Unable to start Sentry")
@classmethod
def _stop_sentry_service(cls):
subprocess.check_call([os.path.join(os.environ["IMPALA_HOME"],
"testdata/bin/kill-sentry-service.sh")],
close_fds=True)
@classmethod
def _start_hive_service(cls, hive_conf_dir):
hive_env = dict(os.environ)
if hive_conf_dir is not None:
hive_env['HIVE_CONF_DIR'] = hive_conf_dir
call = subprocess.Popen(
['/bin/bash', '-c', os.path.join(IMPALA_HOME, 'testdata/bin/run-hive-server.sh')],
env=hive_env)
call.wait()
if call.returncode != 0:
raise RuntimeError("Unable to start Hive")
@classmethod
def _stop_hive_service(cls):
subprocess.check_call([os.path.join(IMPALA_HOME,
"testdata/bin/kill-hive-server.sh")],
close_fds=True)
@classmethod
def _start_impala_cluster(cls,
options,
impala_log_dir=os.getenv('LOG_DIR', "/tmp/"),
cluster_size=DEFAULT_CLUSTER_SIZE,
num_coordinators=NUM_COORDINATORS,
use_exclusive_coordinators=False,
add_executors=False,
log_level=1,
expected_num_executors=DEFAULT_CLUSTER_SIZE,
expected_subscribers=0,
default_query_options=None,
statestored_timeout_s=60,
impalad_timeout_s=60):
cls.impala_log_dir = impala_log_dir
# We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests specifically test that
# certain custom startup arguments work and we want to keep them independent of dev
# environments.
cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
'--state_store_args=%s' % DEFAULT_STATESTORE_ARGS,
'--cluster_size=%d' % cluster_size,
'--num_coordinators=%d' % num_coordinators,
'--log_dir=%s' % impala_log_dir,
'--log_level=%s' % log_level]
if use_exclusive_coordinators:
cmd.append("--use_exclusive_coordinators")
if add_executors:
cmd.append("--add_executors")
if pytest.config.option.use_local_catalog:
cmd.append("--impalad_args=--use_local_catalog=1")
cmd.append("--catalogd_args=--catalog_topic_mode=minimal")
default_query_option_kvs = []
# Put any defaults first, then any arguments after that so they can override defaults.
if os.environ.get("ERASURE_CODING") == "true":
default_query_option_kvs.append(("allow_erasure_coded_files", "true"))
if default_query_options is not None:
default_query_option_kvs.extend(default_query_options)
# Add the default query options after any arguments. This will override any default
# options set in --impalad_args by design to force tests to pass default_query_options
# into this function directly.
options.append("--impalad_args=--default_query_options={0}".format(
','.join(["{0}={1}".format(k, v) for k, v in default_query_option_kvs])))
logging.info("Starting cluster with command: %s" %
" ".join(pipes.quote(arg) for arg in cmd + options))
try:
check_call(cmd + options, close_fds=True)
finally:
# Failure tests expect cluster to be initialised even if start-impala-cluster fails.
cls.cluster = ImpalaCluster.get_e2e_test_cluster()
statestored = cls.cluster.statestored
if statestored is None:
raise Exception("statestored was not found")
# The number of statestore subscribers is
# cluster_size (# of impalad) + 1 (for catalogd).
if expected_subscribers == 0:
expected_subscribers = expected_num_executors + 1
statestored.service.wait_for_live_subscribers(expected_subscribers,
timeout=statestored_timeout_s)
for impalad in cls.cluster.impalads:
impalad.service.wait_for_num_known_live_backends(expected_num_executors,
timeout=impalad_timeout_s)