blob: efa4fedfd3b98c3ec448bca962baaa36f81e0d56 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Test Catalog behavior when HMS is not present
import os
import pytest
from subprocess import check_call
import time
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.custom_cluster_test_suite import (
CustomClusterTestSuite,
NUM_SUBSCRIBERS)
from tests.util.filesystem_utils import IS_ISILON, IS_LOCAL
class TestCatalogHMSFailures(CustomClusterTestSuite):
@classmethod
def setup_class(cls):
super(TestCatalogHMSFailures, cls).setup_class()
@classmethod
def run_hive_server(cls):
script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/run-hive-server.sh')
run_cmd = [script]
if IS_LOCAL or IS_ISILON:
run_cmd.append('-only_metastore')
check_call(run_cmd, close_fds=True)
@classmethod
def cleanup_process(cls, proc):
try:
proc.kill()
except:
pass
try:
proc.wait()
except:
pass
@classmethod
def teardown_class(cls):
# Make sure the metastore is running even if the test aborts somewhere unexpected
# before restarting the metastore itself.
cls.run_hive_server()
super(TestCatalogHMSFailures, cls).teardown_class()
@classmethod
def reload_metadata(cls, client):
client.execute('invalidate metadata')
client.execute('show databases')
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(catalogd_args='--initial_hms_cnxn_timeout_s=120')
def test_kill_hms_after_catalog_init(self, vector):
"""IMPALA-4278: If HMS dies after catalogd initialization, SQL statements that force
metadata load should fail quickly. After HMS restart, metadata load should work
again"""
# Make sure that catalogd is connected to HMS
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_beeswax_client()
self.reload_metadata(client)
# Kill Hive
kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
check_call([kill_cmd], close_fds=True)
# Metadata load should fail quickly
start = time.time()
try:
self.reload_metadata(client)
except ImpalaBeeswaxException as e:
assert "Connection refused" in str(e)
else:
assert False, "Metadata load should have failed"
end = time.time()
assert end - start < 30, "Metadata load hasn't failed quickly enough"
# Start Hive
self.run_hive_server()
# Metadata load should work now
self.reload_metadata(client)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(catalogd_args='--initial_hms_cnxn_timeout_s=120')
def test_start_catalog_before_hms(self, vector):
"""IMPALA-4278: If catalogd is started with initial_hms_cnxn_timeout_s set to a value
greater than HMS startup time, it will manage to establish connection to HMS even if
HMS is started a little later"""
# Make sure that catalogd is connected to HMS
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_beeswax_client()
self.reload_metadata(client)
# Kill Hive
kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
check_call([kill_cmd], close_fds=True)
# Kill the catalogd.
catalogd = self.cluster.catalogd
catalogd.kill()
# The statestore should detect the catalog service has gone down.
statestored = self.cluster.statestored
statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS - 1, timeout=60)
try:
# Start the catalog service asynchronously.
catalogd.start()
# Wait 10s to be sure that the catalogd is in the 'trying to connect' phase of its
# startup.
time.sleep(10)
# Start Hive and wait for catalogd to come up
self.run_hive_server()
statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
impalad.service.wait_for_metric_value('catalog.ready', 1, timeout=60)
# Metadata load should work now
self.reload_metadata(client)
finally:
# Make sure to clean up the catalogd process that we started
self.cleanup_process(catalogd)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(catalogd_args='--initial_hms_cnxn_timeout_s=30')
def test_catalogd_fails_if_hms_started_late(self, vector):
"""IMPALA-4278: If the HMS is not started within initial_hms_cnxn_timeout_s, then the
catalogd fails"""
# Make sure that catalogd is connected to HMS
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_beeswax_client()
self.reload_metadata(client)
# Kill Hive
kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
check_call([kill_cmd], close_fds=True)
# Kill the catalogd.
catalogd = self.cluster.catalogd
catalogd.kill()
# The statestore should detect the catalog service has gone down.
statestored = self.cluster.statestored
statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS - 1, timeout=60)
try:
# Start the catalog service asynchronously.
catalogd.start()
# Wait 40s to be sure that the catalogd has been trying to connect to HMS longer
# than initial_hms_cnxn_timeout_s.
time.sleep(40)
# Start Hive
self.run_hive_server()
# catalogd has terminated by now
assert catalogd.get_pid() == None, "catalogd should have terminated"
finally:
# Make sure to clean up the catalogd process that we started
self.cleanup_process(catalogd)
try:
# Start the catalog service again and wait for it to come up.
catalogd.start()
statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
impalad.service.wait_for_metric_value('catalog.ready', 1, timeout=60)
# Metadata load should work now
self.reload_metadata(client)
finally:
# Make sure to clean up the catalogd process that we started
self.cleanup_process(catalogd)