blob: b60f4e2692d19172287621ca44e4aaad3db6c2ef [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from time import sleep
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfBuildType
@SkipIfBuildType.not_dev_build
class TestCatalogWait(CustomClusterTestSuite):
"""Impalad coordinators must wait for their local replica of the catalog to be
initialized from the statestore prior to opening up client ports.
This test simulates a failed or slow catalog on impalad startup."""
def expect_connection(self, impalad):
impalad.service.create_beeswax_client()
impalad.service.create_hs2_client()
def expect_no_connection(self, impalad):
with pytest.raises(Exception) as e:
impalad.service.create_beeswax_client()
assert 'Could not connect to' in str(e.value)
with pytest.raises(Exception) as e:
impalad.service.create_hs2_client()
assert 'Could not connect to' in str(e.value)
@pytest.mark.execute_serially
def test_delayed_catalog(self):
""" Tests client interactions with the cluster when one of the daemons,
impalad[2], is delayed in initializing its local catalog replica."""
# On startup, expect only two executors to be registered.
self._start_impala_cluster(["--catalog_init_delays=0,0,200000"],
expected_num_executors=2,
expected_subscribers=4)
# Expect that impalad[2] is not ready.
self.cluster.impalads[2].service.wait_for_metric_value('impala-server.ready', 0);
# Expect that impalad[0,1] are both ready and with initialized catalog.
self.cluster.impalads[0].service.wait_for_metric_value('impala-server.ready', 1);
self.cluster.impalads[0].service.wait_for_metric_value('catalog.ready', 1);
self.cluster.impalads[1].service.wait_for_metric_value('impala-server.ready', 1);
self.cluster.impalads[1].service.wait_for_metric_value('catalog.ready', 1);
# Expect that connections can be made to impalads[0,1], but not to impalads[2].
self.expect_connection(self.cluster.impalads[0])
self.expect_connection(self.cluster.impalads[1])
self.expect_no_connection(self.cluster.impalads[2])
# Issues a query to check that impalad[2] does not evaluate any fragments
# and does not prematurely register itself as an executor. The former is
# verified via query fragment metrics and the latter would fail if registered
# but unable to process fragments.
client0 = self.cluster.impalads[0].service.create_beeswax_client()
client1 = self.cluster.impalads[1].service.create_beeswax_client()
self.execute_query_expect_success(client0, "select * from functional.alltypes");
self.execute_query_expect_success(client1, "select * from functional.alltypes");
# Check that fragments were run on impalad[0,1] and none on impalad[2].
# Each ready impalad runs a fragment per query and one coordinator fragment. With
# two queries, one coordinated per ready impalad, that should be 3 total fragments.
self.cluster.impalads[0].service.wait_for_metric_value('impala-server.num-fragments', 3);
self.cluster.impalads[1].service.wait_for_metric_value('impala-server.num-fragments', 3);
self.cluster.impalads[2].service.wait_for_metric_value('impala-server.num-fragments', 0);