blob: 5c249817cd60860b361e66a01b1d3f3f94541a6f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import glob
import os
import pytest
import re
import shutil
import subprocess
from tempfile import mkdtemp
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.common.test_dimensions import create_uncompressed_text_dimension
from tests.util.filesystem_utils import get_fs_path
class TestUdfPersistence(CustomClusterTestSuite):
""" Tests the behavior of UDFs and UDAs between catalog restarts. With IMPALA-1748,
these functions are persisted to the metastore and are loaded again during catalog
startup"""
DATABASE = 'udf_permanent_test'
JAVA_FN_TEST_DB = 'java_permanent_test'
HIVE_IMPALA_INTEGRATION_DB = 'hive_impala_integration_db'
HIVE_UDF_JAR = os.getenv('DEFAULT_FS') + '/test-warehouse/hive-exec.jar';
JAVA_UDF_JAR = os.getenv('DEFAULT_FS') + '/test-warehouse/impala-hive-udfs.jar';
LOCAL_LIBRARY_DIR = mkdtemp(dir="/tmp")
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestUdfPersistence, cls).setup_class()
@classmethod
def add_test_dimensions(cls):
super(TestUdfPersistence, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def setup_method(self, method):
super(TestUdfPersistence, self).setup_method(method)
impalad = self.cluster.impalads[0]
self.client = impalad.service.create_beeswax_client()
self.__cleanup()
self.__load_drop_functions(
self.CREATE_UDFS_TEMPLATE, self.DATABASE,
get_fs_path('/test-warehouse/libTestUdfs.so'))
self.__load_drop_functions(
self.DROP_SAMPLE_UDAS_TEMPLATE, self.DATABASE,
get_fs_path('/test-warehouse/libudasample.so'))
self.__load_drop_functions(
self.CREATE_SAMPLE_UDAS_TEMPLATE, self.DATABASE,
get_fs_path('/test-warehouse/libudasample.so'))
self.__load_drop_functions(
self.CREATE_TEST_UDAS_TEMPLATE, self.DATABASE,
get_fs_path('/test-warehouse/libTestUdas.so'))
self.uda_count =\
self.CREATE_SAMPLE_UDAS_TEMPLATE.count("create aggregate function") +\
self.CREATE_TEST_UDAS_TEMPLATE.count("create aggregate function")
self.udf_count = self.CREATE_UDFS_TEMPLATE.count("create function")
self.client.execute("CREATE DATABASE IF NOT EXISTS %s" % self.JAVA_FN_TEST_DB)
self.client.execute("CREATE DATABASE IF NOT EXISTS %s" %
self.HIVE_IMPALA_INTEGRATION_DB)
def teardown_method(self, method):
self.__cleanup()
def __cleanup(self):
self.client.execute("DROP DATABASE IF EXISTS %s CASCADE" % self.DATABASE)
self.client.execute("DROP DATABASE IF EXISTS %s CASCADE" % self.JAVA_FN_TEST_DB)
self.client.execute("DROP DATABASE IF EXISTS %s CASCADE"
% self.HIVE_IMPALA_INTEGRATION_DB)
shutil.rmtree(self.LOCAL_LIBRARY_DIR, ignore_errors=True)
def __load_drop_functions(self, template, database, location):
queries = template.format(database=database, location=location)
# Split queries and remove empty lines
queries = [q for q in queries.split(';') if q.strip()]
for query in queries:
result = self.client.execute(query)
assert result is not None
def __restart_cluster(self):
self._stop_impala_cluster()
self._start_impala_cluster(list())
impalad = self.cluster.impalads[0]
self.client = impalad.service.create_beeswax_client()
def verify_function_count(self, query, count):
result = self.client.execute(query)
assert result is not None and len(result.data) == count
@pytest.mark.execute_serially
def test_permanent_udfs(self):
# Make sure the pre-calculated count tallies with the number of
# functions shown using "show [aggregate] functions" statement
self.verify_function_count(
"SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count);
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), self.uda_count)
# invalidate metadata and make sure the count tallies
result = self.client.execute("INVALIDATE METADATA")
self.verify_function_count(
"SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count);
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), self.uda_count)
# Restart the cluster, this triggers a full metadata reload
self.__restart_cluster()
# Make sure the counts of udfs and udas match post restart
self.verify_function_count(
"SHOW FUNCTIONS in {0}".format(self.DATABASE), self.udf_count);
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), self.uda_count)
# Drop sample udas and verify the count matches pre and post restart
self.__load_drop_functions(
self.DROP_SAMPLE_UDAS_TEMPLATE, self.DATABASE,
get_fs_path('/test-warehouse/libudasample.so'))
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), 1)
self.__restart_cluster()
self.verify_function_count(
"SHOW AGGREGATE FUNCTIONS in {0}".format(self.DATABASE), 1)
def __verify_udf_in_hive(self, udf):
(query, result) = self.SAMPLE_JAVA_UDFS_TEST[udf]
stdout = self.run_stmt_in_hive("select " + query.format(
db=self.HIVE_IMPALA_INTEGRATION_DB))
assert stdout is not None and result in str(stdout)
def __verify_udf_in_impala(self, udf):
(query, result) = self.SAMPLE_JAVA_UDFS_TEST[udf]
stdout = self.client.execute("select " + query.format(
db=self.HIVE_IMPALA_INTEGRATION_DB))
assert stdout is not None and result in str(stdout.data)
def __describe_udf_in_hive(self, udf, db=HIVE_IMPALA_INTEGRATION_DB):
""" Describe the specified function, returning stdout. """
# Hive 2+ caches UDFs, so we have to explicitly invalidate the UDF if
# we've made changes on the Impala side.
stmt = "RELOAD FUNCTION ; DESCRIBE FUNCTION {0}.{1}".format(db, udf)
return self.run_stmt_in_hive(stmt)
@SkipIfIsilon.hive
@SkipIfS3.hive
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfLocal.hive
@pytest.mark.execute_serially
def test_corrupt_java_udf(self):
""" IMPALA-3820: This tests if the Catalog server can gracefully handle
Java UDFs with unresolved dependencies."""
if self.exploration_strategy() != 'exhaustive': pytest.skip()
# Create a Java UDF with unresolved dependencies from Hive and
# restart the Catalog server. Catalog should ignore the
# function load.
self.run_stmt_in_hive("create function %s.corrupt_udf as \
'org.apache.impala.UnresolvedUdf' using jar '%s'"
% (self.JAVA_FN_TEST_DB, self.JAVA_UDF_JAR))
self.__restart_cluster()
# Make sure the function count is 0
self.verify_function_count(
"SHOW FUNCTIONS in {0}".format(self.JAVA_FN_TEST_DB), 0)
@SkipIfIsilon.hive
@SkipIfS3.hive
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfLocal.hive
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR))
def test_java_udfs_hive_integration(self):
''' This test checks the integration between Hive and Impala on
CREATE FUNCTION and DROP FUNCTION statements for persistent Java UDFs.
The main objective of the test is to check the following four cases.
- Add Java UDFs from Impala and make sure they are visible in Hive
- Drop Java UDFs from Impala and make sure this reflects in Hive.
- Add Java UDFs from Hive and make sure they are visitble in Impala
- Drop Java UDFs from Hive and make sure this reflects in Impala
'''
# Add Java UDFs from Impala and check if they are visible in Hive.
# Hive has bug that doesn't display the permanent function in show functions
# statement. So this test relies on describe function statement which prints
# a message if the function is not present.
udfs_to_test = list(self.SAMPLE_JAVA_UDFS)
if int(os.environ['IMPALA_HIVE_MAJOR_VERSION']) == 2:
udfs_to_test += self.SAMPLE_JAVA_UDFS_HIVE2_ONLY
for (fn, fn_symbol) in udfs_to_test:
self.client.execute(self.DROP_JAVA_UDF_TEMPLATE.format(
db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn))
self.client.execute(self.CREATE_JAVA_UDF_TEMPLATE.format(
db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn,
location=self.HIVE_UDF_JAR, symbol=fn_symbol))
hive_stdout = self.__describe_udf_in_hive(fn)
assert "does not exist" not in hive_stdout
self.__verify_udf_in_hive(fn)
# Drop the function from Impala and check if it reflects in Hive.
self.client.execute(self.DROP_JAVA_UDF_TEMPLATE.format(
db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn))
hive_stdout = self.__describe_udf_in_hive(fn)
assert "does not exist" in hive_stdout
# Create the same set of functions from Hive and make sure they are visible
# in Impala. There are two ways to make functions visible in Impala: invalidate
# metadata and refresh functions <db>.
REFRESH_COMMANDS = ["INVALIDATE METADATA",
"REFRESH FUNCTIONS {0}".format(self.HIVE_IMPALA_INTEGRATION_DB)]
for refresh_command in REFRESH_COMMANDS:
for (fn, fn_symbol) in udfs_to_test:
self.run_stmt_in_hive(self.CREATE_HIVE_UDF_TEMPLATE.format(
db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn,
location=self.HIVE_UDF_JAR, symbol=fn_symbol))
self.client.execute(refresh_command)
for (fn, fn_symbol) in udfs_to_test:
result = self.client.execute("SHOW FUNCTIONS IN {0}".format(
self.HIVE_IMPALA_INTEGRATION_DB))
assert result is not None and len(result.data) > 0 and\
fn in str(result.data)
self.__verify_udf_in_impala(fn)
# Drop the function in Hive and make sure it reflects in Impala.
self.run_stmt_in_hive(self.DROP_JAVA_UDF_TEMPLATE.format(
db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn))
self.client.execute(refresh_command)
self.verify_function_count(
"SHOW FUNCTIONS in {0}".format(self.HIVE_IMPALA_INTEGRATION_DB), 0)
# Make sure we deleted all the temporary jars we copied to the local fs
assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
@SkipIfIsilon.hive
@SkipIfS3.hive
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfLocal.hive
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR))
def test_refresh_native(self):
''' This test checks that a native function is visible in Impala after a
REFRESH FUNCTIONS command. We will add the native function through Hive
by setting DBPROPERTIES of a database.'''
# First we create the function in Impala.
create_func_impala = ("create function {database}.identity_tmp(bigint) "
"returns bigint location '{location}' symbol='Identity'")
self.client.execute(create_func_impala.format(
database=self.HIVE_IMPALA_INTEGRATION_DB,
location=get_fs_path('/test-warehouse/libTestUdfs.so')))
# Impala puts the native function into a database property table. We extract the key
# value pair that represents the function from the table.
describe_db_hive = "DESCRIBE DATABASE EXTENDED {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB)
result = self.run_stmt_in_hive(describe_db_hive)
regex = r"{(.*?)=(.*?)}"
match = re.search(regex, result)
func_name = match.group(1)
func_contents = match.group(2)
# Recreate the database, this deletes the function.
self.client.execute("DROP DATABASE {database} CASCADE".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
self.client.execute("CREATE DATABASE {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
assert result is not None and len(result.data) == 0
# Place the function into the recreated database by modifying it's properties.
alter_db_hive = "ALTER DATABASE {database} SET DBPROPERTIES ('{fn_name}'='{fn_val}')"
self.run_stmt_in_hive(alter_db_hive.format(
database=self.HIVE_IMPALA_INTEGRATION_DB,
fn_name=func_name,
fn_val=func_contents))
result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
assert result is not None and len(result.data) == 0
# The function should be visible in Impala after a REFRESH FUNCTIONS.
self.client.execute("REFRESH FUNCTIONS {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
assert result is not None and len(result.data) > 0 and\
"identity_tmp" in str(result.data)
# Verify that the function returns a correct result.
result = self.client.execute("SELECT {database}.identity_tmp(10)".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
assert result.data[0] == "10"
# Make sure we deleted all the temporary jars we copied to the local fs
assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
@SkipIfIsilon.hive
@SkipIfS3.hive
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfLocal.hive
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR))
def test_refresh_replace(self):
''' This test checks that if we drop a function and then create a
different function with the same name in Hive, the new function will
be visible in Impala after REFRESH FUNCTIONS.'''
# Create an original function.
create_orig_func_hive = ("create function {database}.test_func as "
"'org.apache.hadoop.hive.ql.udf.UDFHex' using jar '{jar}'")
self.run_stmt_in_hive(create_orig_func_hive.format(
database=self.HIVE_IMPALA_INTEGRATION_DB, jar=self.JAVA_UDF_JAR))
result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
assert result is not None and len(result.data) == 0
# Verify the function becomes visible in Impala after REFRESH FUNCTIONS.
self.client.execute("REFRESH FUNCTIONS {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
assert (result is not None and len(result.data) == 3 and
"test_func" in str(result.data))
result = self.client.execute("SELECT {database}.test_func(123)".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
assert result.data[0] == "7B"
# Drop the original function and create a different function with the same name as
# the original, but a different JAR.
drop_orig_func_hive = "DROP FUNCTION {database}.test_func"
self.run_stmt_in_hive(drop_orig_func_hive.format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
create_replacement_func_hive = ("create function {database}.test_func as "
"'org.apache.hadoop.hive.ql.udf.UDFBin' using jar '{jar}'")
self.run_stmt_in_hive(create_replacement_func_hive.format(
database=self.HIVE_IMPALA_INTEGRATION_DB, jar=self.JAVA_UDF_JAR))
self.client.execute("REFRESH FUNCTIONS {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
assert (result is not None and len(result.data) == 1 and
"test_func" in str(result.data))
# Verify that the function has actually been updated.
result = self.client.execute("SELECT {database}.test_func(123)".format(
database=self.HIVE_IMPALA_INTEGRATION_DB))
assert result.data[0] == "1111011"
# Make sure we deleted all the temporary jars we copied to the local fs
assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
@pytest.mark.execute_serially
def test_java_udfs_from_impala(self):
""" This tests checks the behavior of permanent Java UDFs in Impala."""
self.verify_function_count(
"SHOW FUNCTIONS in {0}".format(self.JAVA_FN_TEST_DB), 0);
# Create a non persistent Java UDF and make sure we can't create a
# persistent Java UDF with same name
self.client.execute("create function %s.%s(boolean) returns boolean "\
"location '%s' symbol='%s'" % (self.JAVA_FN_TEST_DB, "identity",
self.JAVA_UDF_JAR, "org.apache.impala.TestUdf"))
result = self.execute_query_expect_failure(self.client,
self.CREATE_JAVA_UDF_TEMPLATE.format(db=self.JAVA_FN_TEST_DB,
function="identity", location=self.JAVA_UDF_JAR,
symbol="org.apache.impala.TestUdf"))
assert "Function already exists" in str(result)
# Test the same with a NATIVE function
self.client.execute("create function {database}.identity(int) "\
"returns int location '{location}' symbol='Identity'".format(
database=self.JAVA_FN_TEST_DB,
location="/test-warehouse/libTestUdfs.so"))
result = self.execute_query_expect_failure(self.client,
self.CREATE_JAVA_UDF_TEMPLATE.format(db=self.JAVA_FN_TEST_DB,
function="identity", location=self.JAVA_UDF_JAR,
symbol="org.apache.impala.TestUdf"))
assert "Function already exists" in str(result)
# Test the reverse. Add a persistent Java UDF and ensure we cannot
# add non persistent Java UDFs or NATIVE functions with the same name.
self.client.execute(self.CREATE_JAVA_UDF_TEMPLATE.format(
db=self.JAVA_FN_TEST_DB, function="identity_java",
location=self.JAVA_UDF_JAR, symbol="org.apache.impala.TestUdf"))
result = self.execute_query_expect_failure(self.client, "create function "\
"%s.%s(boolean) returns boolean location '%s' symbol='%s'" % (
self.JAVA_FN_TEST_DB, "identity_java", self.JAVA_UDF_JAR,
"org.apache.impala.TestUdf"))
assert "Function already exists" in str(result)
result = self.execute_query_expect_failure(self.client, "create function "\
"{database}.identity_java(int) returns int location '{location}' "\
"symbol='Identity'".format(database=self.JAVA_FN_TEST_DB,
location="/test-warehouse/libTestUdfs.so"))
assert "Function already exists" in str(result)
# With IF NOT EXISTS, the query shouldn't fail.
result = self.execute_query_expect_success(self.client, "create function "\
" if not exists {database}.identity_java(int) returns int location "\
"'{location}' symbol='Identity'".format(database=self.JAVA_FN_TEST_DB,
location="/test-warehouse/libTestUdfs.so"))
result = self.client.execute("SHOW FUNCTIONS in %s" % self.JAVA_FN_TEST_DB)
self.execute_query_expect_success(self.client,
"DROP FUNCTION IF EXISTS {db}.impala_java".format(db=self.JAVA_FN_TEST_DB))
# Drop the persistent Java function.
# Test the same create with IF NOT EXISTS. No exception should be thrown.
# Add a Java udf which has a few incompatible 'evaluate' functions in the
# symbol class. Catalog should load only the compatible ones. JavaUdfTest
# has 8 evaluate signatures out of which only 3 are valid.
compatibility_fn_count = 3
self.client.execute(self.CREATE_JAVA_UDF_TEMPLATE.format(
db=self.JAVA_FN_TEST_DB, function="compatibility",
location=self.JAVA_UDF_JAR, symbol="org.apache.impala.JavaUdfTest"))
self.verify_function_count(
"SHOW FUNCTIONS IN %s like 'compatibility*'" % self.JAVA_FN_TEST_DB,
compatibility_fn_count)
result = self.client.execute("SHOW FUNCTIONS in %s" % self.JAVA_FN_TEST_DB)
function_count = len(result.data)
# Invalidating metadata should preserve all the functions
self.client.execute("INVALIDATE METADATA")
self.verify_function_count(
"SHOW FUNCTIONS IN %s" % self.JAVA_FN_TEST_DB, function_count)
# Restarting the cluster should preserve only the persisted functions. In
# this case, identity(boolean) should be wiped out.
self.__restart_cluster()
self.verify_function_count(
"SHOW FUNCTIONS IN %s" % self.JAVA_FN_TEST_DB, function_count-1)
# Dropping persisted Java UDFs with old syntax should raise an exception
self.execute_query_expect_failure(self.client,
"DROP FUNCTION compatibility(smallint)")
self.verify_function_count(
"SHOW FUNCTIONS IN %s like 'compatibility*'" % self.JAVA_FN_TEST_DB, 3)
# Drop the functions and make sure they don't appear post restart.
self.client.execute("DROP FUNCTION %s.compatibility" % self.JAVA_FN_TEST_DB)
self.verify_function_count(
"SHOW FUNCTIONS IN %s like 'compatibility*'" % self.JAVA_FN_TEST_DB, 0)
self.__restart_cluster()
self.verify_function_count(
"SHOW FUNCTIONS IN %s like 'compatibility*'" % self.JAVA_FN_TEST_DB, 0)
# Try to load a UDF that has no compatible signatures. Make sure it is not added
# to Hive and Impala.
result = self.execute_query_expect_failure(self.client,
self.CREATE_JAVA_UDF_TEMPLATE.format(db=self.JAVA_FN_TEST_DB, function="badudf",
location=self.JAVA_UDF_JAR, symbol="org.apache.impala.IncompatibleUdfTest"))
assert "No compatible function signatures" in str(result)
self.verify_function_count(
"SHOW FUNCTIONS IN %s like 'badudf*'" % self.JAVA_FN_TEST_DB, 0)
result = self.__describe_udf_in_hive('badudf', db=self.JAVA_FN_TEST_DB)
assert "does not exist" in str(result)
# Create the same function from hive and make sure Impala doesn't load any signatures.
self.run_stmt_in_hive(self.CREATE_HIVE_UDF_TEMPLATE.format(
db=self.JAVA_FN_TEST_DB, function="badudf",
location=self.JAVA_UDF_JAR, symbol="org.apache.impala.IncompatibleUdfTest"))
result = self.__describe_udf_in_hive('badudf', db=self.JAVA_FN_TEST_DB)
assert "does not exist" not in str(result)
self.client.execute("INVALIDATE METADATA")
self.verify_function_count(
"SHOW FUNCTIONS IN %s like 'badudf*'" % self.JAVA_FN_TEST_DB, 0)
# Add a function with the same name from Impala. It should fail.
result = self.execute_query_expect_failure(self.client,
self.CREATE_JAVA_UDF_TEMPLATE.format(db=self.JAVA_FN_TEST_DB, function="badudf",
location=self.JAVA_UDF_JAR, symbol="org.apache.impala.TestUdf"))
assert "Function badudf already exists" in str(result)
# Drop the function and make sure the function if dropped from hive
self.client.execute(self.DROP_JAVA_UDF_TEMPLATE.format(
db=self.JAVA_FN_TEST_DB, function="badudf"))
result = self.__describe_udf_in_hive('badudf', db=self.JAVA_FN_TEST_DB)
assert "does not exist" in str(result)
# Create sample UDA functions in {database} from library {location}
DROP_SAMPLE_UDAS_TEMPLATE = """
drop function if exists {database}.test_count(int);
drop function if exists {database}.hll(int);
drop function if exists {database}.sum_small_decimal(decimal(9,2));
"""
CREATE_JAVA_UDF_TEMPLATE = """
CREATE FUNCTION {db}.{function} LOCATION '{location}' symbol='{symbol}'
"""
CREATE_HIVE_UDF_TEMPLATE = """
CREATE FUNCTION {db}.{function} as '{symbol}' USING JAR '{location}'
"""
DROP_JAVA_UDF_TEMPLATE = "DROP FUNCTION IF EXISTS {db}.{function}"
# Sample java udfs from hive-exec.jar. Function name to symbol class mapping
SAMPLE_JAVA_UDFS = [
('udfpi', 'org.apache.hadoop.hive.ql.udf.UDFPI'),
('udfbin', 'org.apache.hadoop.hive.ql.udf.UDFBin'),
('udfhex', 'org.apache.hadoop.hive.ql.udf.UDFHex'),
('udfconv', 'org.apache.hadoop.hive.ql.udf.UDFConv'),
('udflike', 'org.apache.hadoop.hive.ql.udf.UDFLike'),
('udfsign', 'org.apache.hadoop.hive.ql.udf.UDFSign'),
('udfascii','org.apache.hadoop.hive.ql.udf.UDFAscii')
]
# These UDFs are available in Hive 2 but in Hive 3 are now implemented
# using a new GenericUDF interface that we don't support.
SAMPLE_JAVA_UDFS_HIVE2_ONLY = [
('udfhour', 'org.apache.hadoop.hive.ql.udf.UDFHour'),
('udfyear', 'org.apache.hadoop.hive.ql.udf.UDFYear'),
]
# Simple tests to verify java udfs in SAMPLE_JAVA_UDFS
SAMPLE_JAVA_UDFS_TEST = {
'udfpi' : ('{db}.udfpi()', '3.141592653589793'),
'udfbin' : ('{db}.udfbin(123)', '1111011'),
'udfhex' : ('{db}.udfhex(123)', '7B'),
'udfconv' : ('{db}.udfconv("100", 2, 10)', '4'),
'udfhour' : ('{db}.udfhour("12:55:12")', '12'),
'udflike' : ('{db}.udflike("abc", "def")', 'false'),
'udfsign' : ('{db}.udfsign(0)', '0'),
'udfyear' : ('{db}.udfyear("1990-02-06")', '1990'),
'udfascii' : ('{db}.udfascii("abc")','97')
}
CREATE_SAMPLE_UDAS_TEMPLATE = """
create database if not exists {database};
create aggregate function {database}.test_count(int) returns bigint
location '{location}' update_fn='CountUpdate';
create aggregate function {database}.hll(int) returns string
location '{location}' update_fn='HllUpdate';
create aggregate function {database}.sum_small_decimal(decimal(9,2))
returns decimal(9,2) location '{location}' update_fn='SumSmallDecimalUpdate';
"""
# Create test UDA functions in {database} from library {location}
CREATE_TEST_UDAS_TEMPLATE = """
drop function if exists {database}.trunc_sum(double);
create database if not exists {database};
create aggregate function {database}.trunc_sum(double)
returns bigint intermediate double location '{location}'
update_fn='TruncSumUpdate' merge_fn='TruncSumMerge'
serialize_fn='TruncSumSerialize' finalize_fn='TruncSumFinalize';
"""
# Create test UDF functions in {database} from library {location}
CREATE_UDFS_TEMPLATE = """
drop function if exists {database}.identity(boolean);
drop function if exists {database}.identity(tinyint);
drop function if exists {database}.identity(smallint);
drop function if exists {database}.identity(int);
drop function if exists {database}.identity(bigint);
drop function if exists {database}.identity(float);
drop function if exists {database}.identity(double);
drop function if exists {database}.identity(string);
drop function if exists {database}.identity(timestamp);
drop function if exists {database}.identity(date);
drop function if exists {database}.identity(decimal(9,0));
drop function if exists {database}.identity(decimal(18,1));
drop function if exists {database}.identity(decimal(38,10));
drop function if exists {database}.all_types_fn(
string, boolean, tinyint, smallint, int, bigint, float, double, decimal(2,0),
date);
drop function if exists {database}.no_args();
drop function if exists {database}.var_and(boolean...);
drop function if exists {database}.var_sum(int...);
drop function if exists {database}.var_sum(double...);
drop function if exists {database}.var_sum(string...);
drop function if exists {database}.var_sum(decimal(4,2)...);
drop function if exists {database}.var_sum_multiply(double, int...);
drop function if exists {database}.constant_timestamp();
drop function if exists {database}.constant_date();
drop function if exists {database}.validate_arg_type(string);
drop function if exists {database}.count_rows();
drop function if exists {database}.constant_arg(int);
drop function if exists {database}.validate_open(int);
drop function if exists {database}.mem_test(bigint);
drop function if exists {database}.mem_test_leaks(bigint);
drop function if exists {database}.unmangled_symbol();
drop function if exists {database}.four_args(int, int, int, int);
drop function if exists {database}.five_args(int, int, int, int, int);
drop function if exists {database}.six_args(int, int, int, int, int, int);
drop function if exists {database}.seven_args(int, int, int, int, int, int, int);
drop function if exists {database}.eight_args(int, int, int, int, int, int, int, int);
create database if not exists {database};
create function {database}.identity(boolean) returns boolean
location '{location}' symbol='Identity';
create function {database}.identity(tinyint) returns tinyint
location '{location}' symbol='Identity';
create function {database}.identity(smallint) returns smallint
location '{location}' symbol='Identity';
create function {database}.identity(int) returns int
location '{location}' symbol='Identity';
create function {database}.identity(bigint) returns bigint
location '{location}' symbol='Identity';
create function {database}.identity(float) returns float
location '{location}' symbol='Identity';
create function {database}.identity(double) returns double
location '{location}' symbol='Identity';
create function {database}.identity(string) returns string
location '{location}'
symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE';
create function {database}.identity(timestamp) returns timestamp
location '{location}'
symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_12TimestampValE';
create function {database}.identity(date) returns date
location '{location}'
symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_7DateValE';
create function {database}.identity(decimal(9,0)) returns decimal(9,0)
location '{location}'
symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
create function {database}.identity(decimal(18,1)) returns decimal(18,1)
location '{location}'
symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
create function {database}.identity(decimal(38,10)) returns decimal(38,10)
location '{location}'
symbol='_Z8IdentityPN10impala_udf15FunctionContextERKNS_10DecimalValE';
create function {database}.all_types_fn(
string, boolean, tinyint, smallint, int, bigint, float, double, decimal(2,0),
date)
returns int
location '{location}' symbol='AllTypes';
create function {database}.no_args() returns string
location '{location}'
symbol='_Z6NoArgsPN10impala_udf15FunctionContextE';
create function {database}.var_and(boolean...) returns boolean
location '{location}' symbol='VarAnd';
create function {database}.var_sum(int...) returns int
location '{location}' symbol='VarSum';
create function {database}.var_sum(double...) returns double
location '{location}' symbol='VarSum';
create function {database}.var_sum(string...) returns int
location '{location}' symbol='VarSum';
create function {database}.var_sum(decimal(4,2)...) returns decimal(18,2)
location '{location}' symbol='VarSum';
create function {database}.var_sum_multiply(double, int...) returns double
location '{location}'
symbol='_Z14VarSumMultiplyPN10impala_udf15FunctionContextERKNS_9DoubleValEiPKNS_6IntValE';
create function {database}.constant_timestamp() returns timestamp
location '{location}' symbol='ConstantTimestamp';
create function {database}.constant_date() returns date
location '{location}' symbol='ConstantDate';
create function {database}.validate_arg_type(string) returns boolean
location '{location}' symbol='ValidateArgType';
create function {database}.count_rows() returns bigint
location '{location}' symbol='Count' prepare_fn='CountPrepare' close_fn='CountClose';
create function {database}.constant_arg(int) returns int
location '{location}' symbol='ConstantArg' prepare_fn='ConstantArgPrepare' close_fn='ConstantArgClose';
create function {database}.validate_open(int) returns boolean
location '{location}' symbol='ValidateOpen'
prepare_fn='ValidateOpenPrepare' close_fn='ValidateOpenClose';
create function {database}.mem_test(bigint) returns bigint
location '{location}' symbol='MemTest'
prepare_fn='MemTestPrepare' close_fn='MemTestClose';
create function {database}.mem_test_leaks(bigint) returns bigint
location '{location}' symbol='MemTest'
prepare_fn='MemTestPrepare';
"""