blob: 939c4a255a52e55ff789cc9241134e3c2d261847 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Tests to validate the Catalog Service client APIs.
import logging
import pytest
from CatalogService import CatalogService
from CatalogService.CatalogService import TGetFunctionsRequest
from ErrorCodes.ttypes import TErrorCode
from thrift.protocol import TBinaryProtocol
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfDockerizedCluster
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.util.filesystem_utils import WAREHOUSE
from tests.util.thrift_util import create_transport
LOG = logging.getLogger('test_catalog_service_client')
# TODO: Add a test that asserts correct/compatible responses
# to create/drop function requests. For example, BDR relies
# on a stable catalog Thrift API.
class TestCatalogServiceClient(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestCatalogServiceClient, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
# There is no reason to run these tests using all dimensions.
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format == 'parquet' and\
v.get_value('table_format').compression_codec == 'none')
@SkipIfDockerizedCluster.catalog_service_not_exposed
def test_get_functions(self, vector, unique_database):
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
catalogd = impala_cluster.catalogd.service
trans_type = 'buffered'
if pytest.config.option.use_kerberos:
trans_type = 'kerberos'
transport = create_transport(host=catalogd.hostname, port=catalogd.service_port,
service='impala', transport_type=trans_type)
transport.open()
protocol = TBinaryProtocol.TBinaryProtocol(transport)
catalog_client = CatalogService.Client(protocol)
request = TGetFunctionsRequest()
request.db_name = unique_database
response = catalog_client.GetFunctions(request)
assert response.status.status_code == TErrorCode.OK
assert len(response.functions) == 0
self.client.execute("create function %s.fn() RETURNS int "
"LOCATION '%s/libTestUdfs.so' SYMBOL='Fn'"
% (unique_database, WAREHOUSE))
response = catalog_client.GetFunctions(request)
LOG.debug(response)
assert len(response.functions) == 1
assert len(response.functions[0].arg_types) == 0
assert response.functions[0].name.db_name == unique_database
assert response.functions[0].name.function_name == 'fn'
assert response.functions[0].aggregate_fn is None
assert response.functions[0].scalar_fn is not None
assert '/test-warehouse/libTestUdfs.so' in response.functions[0].hdfs_location
# Add another scalar function with overloaded parameters ensure it shows up.
self.client.execute("create function %s.fn(int) RETURNS double "\
"LOCATION '%s/libTestUdfs.so' SYMBOL='Fn'" % (unique_database, WAREHOUSE))
response = catalog_client.GetFunctions(request)
LOG.debug(response)
assert response.status.status_code == TErrorCode.OK
assert len(response.functions) == 2
functions = [fn for fn in response.functions]
# Sort by number of arg in the function (ascending)
functions.sort(key=lambda fn: len(fn.arg_types))
assert len(functions[0].arg_types) == 0
assert len(functions[1].arg_types) == 1
assert functions[0].signature == 'fn()'
assert functions[1].signature == 'fn(INT)'
# Verify aggregate functions can also be retrieved
self.client.execute("create aggregate function %s.agg_fn(int, string) RETURNS int "
"LOCATION '%s/libTestUdas.so' UPDATE_FN='TwoArgUpdate'"
% (unique_database, WAREHOUSE))
response = catalog_client.GetFunctions(request)
LOG.debug(response)
assert response.status.status_code == TErrorCode.OK
assert len(response.functions) == 3
functions = [fn for fn in response.functions if fn.aggregate_fn is not None]
# Should be only 1 aggregate function
assert len(functions) == 1
# Negative test cases for database name
request.db_name = unique_database + "_does_not_exist"
response = catalog_client.GetFunctions(request)
LOG.debug(response)
assert response.status.status_code == TErrorCode.GENERAL
assert 'Database does not exist: ' in str(response.status)
request = TGetFunctionsRequest()
response = catalog_client.GetFunctions(request)
LOG.debug(response)
assert response.status.status_code == TErrorCode.GENERAL
assert 'Database name must be set' in str(response.status)