blob: 1d821a0ee1d097efb928143241ac43f85109efbc [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
import re
from time import sleep
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import (
SkipIfS3,
SkipIfABFS,
SkipIfADLS,
SkipIfIsilon,
SkipIfLocal)
from tests.util.hive_utils import HiveDbWrapper
@SkipIfS3.hive
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfIsilon.hive
@SkipIfLocal.hive
class TestMetadataReplicas(CustomClusterTestSuite):
""" Validates metadata content across catalogd and impalad coordinators."""
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestMetadataReplicas, cls).setup_class()
@pytest.mark.execute_serially
def test_start(self):
""" Baseline to verify that the initial state is identical. No DDL/DML
is processed, so no objects are fully loaded."""
self.__validate_metadata()
@pytest.mark.execute_serially
def test_catalog_restart(self, testid_checksum):
""" IMPALA-6948: reproduces the issue by deleting a table from Hive while the catalogd
is down. When catalogd is restarted, if the regression is present, the deleted
table will still be present at the impalads."""
db_name = "test_catalog_restart_%s" % testid_checksum
try:
with HiveDbWrapper(self, db_name):
# Issue several invalidates to boost the version for the current incarnation of the
# catalog. As a result, the table we'll add to Hive will get a version that's easier
# to see is higher than the highest version of the restarted catalogd incarnation.
for i in xrange(0, 50):
self.client.execute("invalidate metadata functional.alltypes")
assert self.cluster.catalogd.service.get_catalog_version() >= 50
# Creates a database and table with Hive and makes it visible to Impala.
self.run_stmt_in_hive("create table %s.x (a string)" % db_name)
self.client.execute("invalidate metadata %s.x" % db_name)
assert "x" in self.client.execute("show tables in %s" % db_name).data
# Stops the catalog
self.cluster.catalogd.kill()
# Drops the table from the catalog using Hive.
self.run_stmt_in_hive("drop table %s.x" % db_name)
# Restarts the catalog
self.cluster.catalogd.start()
# Refreshes the state of the catalogd process.
self.cluster.refresh()
# Wait until the impalad catalog versions agree with the catalogd's version.
catalogd_version = self.cluster.catalogd.service.get_catalog_version()
for impalad in self.cluster.impalads:
impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version)
self.__validate_metadata()
except Exception as e:
assert False, "Unexpected exception: " + str(e)
finally:
# Hack to work-around IMPALA-5695.
self.cluster.catalogd.kill()
def __validate_metadata(self):
""" Computes the pair-wise object version difference between the catalog contents
in catalogd and each impalad. Asserts that there are no differences."""
c_objects = self.cluster.catalogd.service.get_catalog_objects()
i_objects = [proc.service.get_catalog_objects() for proc in self.cluster.impalads]
for idx in xrange(0, len(i_objects)):
i_obj = i_objects[idx]
diff = self.__diff_catalog_objects(c_objects, i_obj)
assert diff[0] == {},\
'catalogd has objects not in impalad(%d): %s ' % (idx, diff[0])
assert diff[1] == {}, 'impalad(%d) has objects not in catalogd: %s' % (idx, diff[1])
assert diff[2] is None,\
'impalad(%d) and catalogd version for objects differs: %s' % (idx, diff[2])
def __diff_catalog_objects(self, a, b):
""" Computes the diff between the input 'a' and 'b' dictionaries. The result is a
list of length 3 where position 0 holds those entries that are in a, but not b,
position 1 those entries that are in b, but not a, and position 2 holds entries
where the key is in both a and b, but whose value differs."""
# diff[0] : a - b
# diff[1] : b - a
# diff[2] : a[k] != b[k]
diff = [None, None, None]
diff[0] = dict((k, a[k]) for k in set(a) - set(b))
diff[1] = dict((k, b[k]) for k in set(b) - set(a))
for k, v_a in a.items():
v_b = b[k]
if v_b is not None:
if v_b != v_a:
diff[2][k] = (v_a, v_b)
return diff