| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.udf.dynamic; |
| |
| import com.google.common.collect.Lists; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.lang3.ArrayUtils; |
| import org.apache.commons.lang3.reflect.FieldUtils; |
| import org.apache.drill.categories.SlowTest; |
| import org.apache.drill.categories.SqlFunctionTest; |
| import org.apache.drill.common.config.ConfigConstants; |
| import org.apache.drill.common.exceptions.UserRemoteException; |
| import org.apache.drill.exec.exception.VersionMismatchException; |
| import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; |
| import org.apache.drill.exec.expr.fn.registry.LocalFunctionRegistry; |
| import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry; |
| import org.apache.drill.exec.proto.UserBitShared.Jar; |
| import org.apache.drill.exec.proto.UserBitShared.Registry; |
| import org.apache.drill.exec.server.DrillbitContext; |
| import org.apache.drill.exec.store.sys.store.DataChangeVersion; |
| import org.apache.drill.exec.util.JarUtil; |
| import org.apache.drill.test.BaseTestQuery; |
| import org.apache.drill.test.TestBuilder; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.ExpectedException; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| |
| import static org.apache.drill.test.HadoopUtils.hadoopToJavaPath; |
| import static org.hamcrest.CoreMatchers.containsString; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyInt; |
| import static org.mockito.ArgumentMatchers.anyString; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.doThrow; |
| import static org.mockito.Mockito.reset; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| |
| @Category({SlowTest.class, SqlFunctionTest.class}) |
| public class TestDynamicUDFSupport extends BaseTestQuery { |
| |
| private static final String DEFAULT_JAR_NAME = "drill-custom-lower"; |
| private static URI fsUri; |
| private static File jarsDir; |
| private static File buildDirectory; |
| private static JarBuilder jarBuilder; |
| private static String defaultBinaryJar; |
| private static String defaultSourceJar; |
| |
| @Rule |
| public ExpectedException thrown = ExpectedException.none(); |
| |
| @BeforeClass |
| public static void buildAndStoreDefaultJars() throws IOException { |
| jarsDir = dirTestWatcher.makeSubDir(Paths.get("jars")); |
| buildDirectory = dirTestWatcher.makeSubDir(Paths.get("drill-udf")); |
| |
| jarBuilder = new JarBuilder("src/test/resources/drill-udf"); |
| defaultBinaryJar = buildJars(DEFAULT_JAR_NAME, "**/CustomLowerFunction.java", null); |
| defaultSourceJar = JarUtil.getSourceName(defaultBinaryJar); |
| |
| FileUtils.copyFileToDirectory(new File(buildDirectory, defaultBinaryJar), jarsDir); |
| FileUtils.copyFileToDirectory(new File(buildDirectory, defaultSourceJar), jarsDir); |
| } |
| |
| @Before |
| public void setupNewDrillbit() throws Exception { |
| updateTestCluster(1, config); |
| fsUri = getLocalFileSystem().getUri(); |
| } |
| |
| @After |
| public void cleanup() throws Exception { |
| closeClient(); |
| dirTestWatcher.clear(); |
| } |
| |
| @Test |
| public void testSyntax() throws Exception { |
| test("create function using jar 'jar_name.jar'"); |
| test("drop function using jar 'jar_name.jar'"); |
| } |
| |
| @Test |
| public void testEnableDynamicSupport() throws Exception { |
| try { |
| test("alter system set `exec.udf.enable_dynamic_support` = true"); |
| test("create function using jar 'jar_name.jar'"); |
| test("drop function using jar 'jar_name.jar'"); |
| } finally { |
| test("alter system reset `exec.udf.enable_dynamic_support`"); |
| } |
| } |
| |
| @Test |
| public void testDisableDynamicSupportCreate() throws Exception { |
| try { |
| test("alter system set `exec.udf.enable_dynamic_support` = false"); |
| String query = "create function using jar 'jar_name.jar'"; |
| thrown.expect(UserRemoteException.class); |
| thrown.expectMessage(containsString("Dynamic UDFs support is disabled.")); |
| test(query); |
| } finally { |
| test("alter system reset `exec.udf.enable_dynamic_support`"); |
| } |
| } |
| |
| @Test |
| public void testDisableDynamicSupportDrop() throws Exception { |
| try { |
| test("alter system set `exec.udf.enable_dynamic_support` = false"); |
| String query = "drop function using jar 'jar_name.jar'"; |
| thrown.expect(UserRemoteException.class); |
| thrown.expectMessage(containsString("Dynamic UDFs support is disabled.")); |
| test(query); |
| } finally { |
| test("alter system reset `exec.udf.enable_dynamic_support`"); |
| } |
| } |
| |
| @Test |
| public void testAbsentBinaryInStaging() throws Exception { |
| Path staging = hadoopToJavaPath(getDrillbitContext().getRemoteFunctionRegistry().getStagingArea()); |
| |
| String summary = String.format("File %s does not exist on file system %s", |
| staging.resolve(defaultBinaryJar).toUri().getPath(), fsUri); |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, summary) |
| .go(); |
| } |
| |
| @Test |
| public void testAbsentSourceInStaging() throws Exception { |
| Path staging = hadoopToJavaPath(getDrillbitContext().getRemoteFunctionRegistry().getStagingArea()); |
| copyJar(jarsDir.toPath(), staging, defaultBinaryJar); |
| |
| String summary = String.format("File %s does not exist on file system %s", |
| staging.resolve(defaultSourceJar).toUri().getPath(), fsUri); |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, summary) |
| .go(); |
| } |
| |
| @Test |
| public void testJarWithoutMarkerFile() throws Exception { |
| String jarName = "drill-no-marker"; |
| String jar = buildAndCopyJarsToStagingArea(jarName, null, "**/dummy.conf"); |
| |
| String summary = "Marker file %s is missing in %s"; |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", jar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, String.format(summary, |
| ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jar)) |
| .go(); |
| } |
| |
| @Test |
| public void testJarWithoutFunctions() throws Exception { |
| String jarName = "drill-no-functions"; |
| String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomLowerDummyFunction.java", null); |
| |
| String summary = "Jar %s does not contain functions"; |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", jar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, String.format(summary, jar)) |
| .go(); |
| } |
| |
| @Test |
| public void testSuccessfulRegistration() throws Exception { |
| copyDefaultJarsToStagingArea(); |
| |
| String summary = "The following UDFs in jar %s have been registered:\n" + |
| "[custom_lower(VARCHAR-REQUIRED)]"; |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(true, String.format(summary, defaultBinaryJar)) |
| .go(); |
| |
| RemoteFunctionRegistry remoteFunctionRegistry = getDrillbitContext().getRemoteFunctionRegistry(); |
| FileSystem fs = remoteFunctionRegistry.getFs(); |
| |
| assertFalse("Staging area should be empty", fs.listFiles(remoteFunctionRegistry.getStagingArea(), false).hasNext()); |
| assertFalse("Temporary area should be empty", fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext()); |
| |
| Path path = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea()); |
| |
| assertTrue("Binary should be present in registry area", |
| path.resolve(defaultBinaryJar).toFile().exists()); |
| assertTrue("Source should be present in registry area", |
| path.resolve(defaultBinaryJar).toFile().exists()); |
| |
| Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion()); |
| assertEquals("Registry should contain one jar", registry.getJarList().size(), 1); |
| assertEquals(registry.getJar(0).getName(), defaultBinaryJar); |
| } |
| |
| @Test |
| public void testDuplicatedJarInRemoteRegistry() throws Exception { |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| copyDefaultJarsToStagingArea(); |
| |
| String summary = "Jar with %s name has been already registered"; |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, String.format(summary, defaultBinaryJar)) |
| .go(); |
| } |
| |
| @Test |
| public void testDuplicatedJarInLocalRegistry() throws Exception { |
| String jarName = "drill-custom-upper"; |
| String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomUpperFunction.java", null); |
| |
| test("create function using jar '%s'", jar); |
| test("select custom_upper('A') from (values(1))"); |
| |
| copyJarsToStagingArea(buildDirectory.toPath(), jar,JarUtil.getSourceName(jar)); |
| |
| String summary = "Jar with %s name has been already registered"; |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", jar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, String.format(summary, jar)) |
| .go(); |
| } |
| |
| @Test |
| public void testDuplicatedFunctionsInRemoteRegistry() throws Exception { |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| |
| String jarName = "drill-custom-lower-copy"; |
| String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomLowerFunction.java", null); |
| |
| String summary = "Found duplicated function in %s: custom_lower(VARCHAR-REQUIRED)"; |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", jar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, String.format(summary, defaultBinaryJar)) |
| .go(); |
| } |
| |
| @Test |
| public void testDuplicatedFunctionsInLocalRegistry() throws Exception { |
| String jarName = "drill-lower"; |
| String jar = buildAndCopyJarsToStagingArea(jarName, "**/LowerFunction.java", null); |
| |
| String summary = "Found duplicated function in %s: lower(VARCHAR-REQUIRED)"; |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", jar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, String.format(summary, LocalFunctionRegistry.BUILT_IN)) |
| .go(); |
| } |
| |
| @Test |
| public void testSuccessfulRegistrationAfterSeveralRetryAttempts() throws Exception { |
| RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry(); |
| Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea()); |
| Path stagingPath = hadoopToJavaPath(remoteFunctionRegistry.getStagingArea()); |
| Path tmpPath = hadoopToJavaPath(remoteFunctionRegistry.getTmpArea()); |
| |
| copyDefaultJarsToStagingArea(); |
| |
| doThrow(new VersionMismatchException("Version mismatch detected", 1)) |
| .doThrow(new VersionMismatchException("Version mismatch detected", 1)) |
| .doCallRealMethod() |
| .when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| String summary = "The following UDFs in jar %s have been registered:\n" + |
| "[custom_lower(VARCHAR-REQUIRED)]"; |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(true, String.format(summary, defaultBinaryJar)) |
| .go(); |
| |
| verify(remoteFunctionRegistry, times(3)) |
| .updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| assertTrue("Staging area should be empty", ArrayUtils.isEmpty(stagingPath.toFile().listFiles())); |
| assertTrue("Temporary area should be empty", ArrayUtils.isEmpty(tmpPath.toFile().listFiles())); |
| |
| assertTrue("Binary should be present in registry area", |
| registryPath.resolve(defaultBinaryJar).toFile().exists()); |
| assertTrue("Source should be present in registry area", |
| registryPath.resolve(defaultSourceJar).toFile().exists()); |
| |
| Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion()); |
| assertEquals("Registry should contain one jar", registry.getJarList().size(), 1); |
| assertEquals(registry.getJar(0).getName(), defaultBinaryJar); |
| } |
| |
| @Test |
| public void testSuccessfulUnregistrationAfterSeveralRetryAttempts() throws Exception { |
| RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry(); |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| |
| reset(remoteFunctionRegistry); |
| doThrow(new VersionMismatchException("Version mismatch detected", 1)) |
| .doThrow(new VersionMismatchException("Version mismatch detected", 1)) |
| .doCallRealMethod() |
| .when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| String summary = "The following UDFs in jar %s have been unregistered:\n" + |
| "[custom_lower(VARCHAR-REQUIRED)]"; |
| |
| testBuilder() |
| .sqlQuery("drop function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(true, String.format(summary, defaultBinaryJar)) |
| .go(); |
| |
| verify(remoteFunctionRegistry, times(3)) |
| .updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| FileSystem fs = remoteFunctionRegistry.getFs(); |
| |
| assertFalse("Registry area should be empty", fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext()); |
| assertEquals("Registry should be empty", |
| remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0); |
| } |
| |
| @Test |
| public void testExceedRetryAttemptsDuringRegistration() throws Exception { |
| RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry(); |
| Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea()); |
| Path stagingPath = hadoopToJavaPath(remoteFunctionRegistry.getStagingArea()); |
| Path tmpPath = hadoopToJavaPath(remoteFunctionRegistry.getTmpArea()); |
| |
| copyDefaultJarsToStagingArea(); |
| |
| doThrow(new VersionMismatchException("Version mismatch detected", 1)) |
| .when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| String summary = "Failed to update remote function registry. Exceeded retry attempts limit."; |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, summary) |
| .go(); |
| |
| verify(remoteFunctionRegistry, times(remoteFunctionRegistry.getRetryAttempts() + 1)) |
| .updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| assertTrue("Binary should be present in staging area", |
| stagingPath.resolve(defaultBinaryJar).toFile().exists()); |
| assertTrue("Source should be present in staging area", |
| stagingPath.resolve(defaultSourceJar).toFile().exists()); |
| |
| assertTrue("Registry area should be empty", ArrayUtils.isEmpty(registryPath.toFile().listFiles())); |
| assertTrue("Temporary area should be empty", ArrayUtils.isEmpty(tmpPath.toFile().listFiles())); |
| |
| assertEquals("Registry should be empty", |
| remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0); |
| } |
| |
| @Test |
| public void testExceedRetryAttemptsDuringUnregistration() throws Exception { |
| RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry(); |
| Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea()); |
| |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| |
| reset(remoteFunctionRegistry); |
| doThrow(new VersionMismatchException("Version mismatch detected", 1)) |
| .when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| String summary = "Failed to update remote function registry. Exceeded retry attempts limit."; |
| |
| testBuilder() |
| .sqlQuery("drop function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, summary) |
| .go(); |
| |
| verify(remoteFunctionRegistry, times(remoteFunctionRegistry.getRetryAttempts() + 1)) |
| .updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| assertTrue("Binary should be present in registry area", |
| registryPath.resolve(defaultBinaryJar).toFile().exists()); |
| assertTrue("Source should be present in registry area", |
| registryPath.resolve(defaultSourceJar).toFile().exists()); |
| |
| Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion()); |
| assertEquals("Registry should contain one jar", registry.getJarList().size(), 1); |
| assertEquals(registry.getJar(0).getName(), defaultBinaryJar); |
| } |
| |
| @Test |
| public void testLazyInit() throws Exception { |
| thrown.expect(UserRemoteException.class); |
| thrown.expectMessage(containsString("No match found for function signature custom_lower(<CHARACTER>)")); |
| test("select custom_lower('A') from (values(1))"); |
| |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| testBuilder() |
| .sqlQuery("select custom_lower('A') as res from (values(1))") |
| .unOrdered() |
| .baselineColumns("res") |
| .baselineValues("a") |
| .go(); |
| |
| Path localUdfDirPath = hadoopToJavaPath((org.apache.hadoop.fs.Path)FieldUtils.readField( |
| getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir", true)); |
| |
| assertTrue("Binary should exist in local udf directory", |
| localUdfDirPath.resolve(defaultBinaryJar).toFile().exists()); |
| assertTrue("Source should exist in local udf directory", |
| localUdfDirPath.resolve(defaultSourceJar).toFile().exists()); |
| } |
| |
| @Test |
| public void testLazyInitWhenDynamicUdfSupportIsDisabled() throws Exception { |
| thrown.expect(UserRemoteException.class); |
| thrown.expectMessage(containsString("No match found for function signature custom_lower(<CHARACTER>)")); |
| test("select custom_lower('A') from (values(1))"); |
| |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| |
| try { |
| testBuilder() |
| .sqlQuery("select custom_lower('A') as res from (values(1))") |
| .optionSettingQueriesForTestQuery("alter system set `exec.udf.enable_dynamic_support` = false") |
| .unOrdered() |
| .baselineColumns("res") |
| .baselineValues("a") |
| .go(); |
| } finally { |
| test("alter system reset `exec.udf.enable_dynamic_support`"); |
| } |
| } |
| |
| @Test |
| public void testOverloadedFunctionPlanningStage() throws Exception { |
| String jarName = "drill-custom-abs"; |
| String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomAbsFunction.java", null); |
| |
| test("create function using jar '%s'", jar); |
| |
| testBuilder() |
| .sqlQuery("select abs('A', 'A') as res from (values(1))") |
| .unOrdered() |
| .baselineColumns("res") |
| .baselineValues("ABS was overloaded. Input: A, A") |
| .go(); |
| } |
| |
| @Test |
| public void testOverloadedFunctionExecutionStage() throws Exception { |
| String jarName = "drill-custom-log"; |
| String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomLogFunction.java", null); |
| |
| test("create function using jar '%s'", jar); |
| |
| testBuilder() |
| .sqlQuery("select log('A') as res from (values(1))") |
| .unOrdered() |
| .baselineColumns("res") |
| .baselineValues("LOG was overloaded. Input: A") |
| .go(); |
| } |
| |
| @Test |
| public void testDropFunction() throws Exception { |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| test("select custom_lower('A') from (values(1))"); |
| |
| Path localUdfDirPath = hadoopToJavaPath((org.apache.hadoop.fs.Path)FieldUtils.readField( |
| getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir", true)); |
| |
| assertTrue("Binary should exist in local udf directory", |
| localUdfDirPath.resolve(defaultBinaryJar).toFile().exists()); |
| assertTrue("Source should exist in local udf directory", |
| localUdfDirPath.resolve(defaultSourceJar).toFile().exists()); |
| |
| String summary = "The following UDFs in jar %s have been unregistered:\n" + |
| "[custom_lower(VARCHAR-REQUIRED)]"; |
| |
| testBuilder() |
| .sqlQuery("drop function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(true, String.format(summary, defaultBinaryJar)) |
| .go(); |
| |
| try { |
| test("select custom_lower('A') from (values(1))"); |
| fail(); |
| } catch (UserRemoteException e) { |
| assertTrue(e.getMessage().contains("No match found for function signature custom_lower(<CHARACTER>)")); |
| } |
| |
| RemoteFunctionRegistry remoteFunctionRegistry = getDrillbitContext().getRemoteFunctionRegistry(); |
| Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea()); |
| |
| assertEquals("Remote registry should be empty", |
| remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0); |
| |
| assertFalse("Binary should not be present in registry area", |
| registryPath.resolve(defaultBinaryJar).toFile().exists()); |
| assertFalse("Source should not be present in registry area", |
| registryPath.resolve(defaultSourceJar).toFile().exists()); |
| |
| assertFalse("Binary should not be present in local udf directory", |
| localUdfDirPath.resolve(defaultBinaryJar).toFile().exists()); |
| assertFalse("Source should not be present in local udf directory", |
| localUdfDirPath.resolve(defaultSourceJar).toFile().exists()); |
| } |
| |
| @Test |
| public void testReRegisterTheSameJarWithDifferentContent() throws Exception { |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| testBuilder() |
| .sqlQuery("select custom_lower('A') as res from (values(1))") |
| .unOrdered() |
| .baselineColumns("res") |
| .baselineValues("a") |
| .go(); |
| test("drop function using jar '%s'", defaultBinaryJar); |
| |
| Thread.sleep(1000); |
| |
| buildAndCopyJarsToStagingArea(DEFAULT_JAR_NAME, "**/CustomLowerFunctionV2.java", null); |
| |
| test("create function using jar '%s'", defaultBinaryJar); |
| testBuilder() |
| .sqlQuery("select custom_lower('A') as res from (values(1))") |
| .unOrdered() |
| .baselineColumns("res") |
| .baselineValues("a_v2") |
| .go(); |
| } |
| |
| @Test |
| public void testDropAbsentJar() throws Exception { |
| String summary = "Jar %s is not registered in remote registry"; |
| |
| testBuilder() |
| .sqlQuery("drop function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, String.format(summary, defaultBinaryJar)) |
| .go(); |
| } |
| |
| @Test |
| public void testRegistrationFailDuringRegistryUpdate() throws Exception { |
| RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry(); |
| Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea()); |
| Path stagingPath = hadoopToJavaPath(remoteFunctionRegistry.getStagingArea()); |
| Path tmpPath = hadoopToJavaPath(remoteFunctionRegistry.getTmpArea()); |
| |
| final String errorMessage = "Failure during remote registry update."; |
| doAnswer(invocation -> { |
| assertTrue("Binary should be present in registry area", |
| registryPath.resolve(defaultBinaryJar).toFile().exists()); |
| assertTrue("Source should be present in registry area", |
| registryPath.resolve(defaultSourceJar).toFile().exists()); |
| throw new RuntimeException(errorMessage); |
| }).when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| copyDefaultJarsToStagingArea(); |
| |
| testBuilder() |
| .sqlQuery("create function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, errorMessage) |
| .go(); |
| |
| assertTrue("Registry area should be empty", ArrayUtils.isEmpty(registryPath.toFile().listFiles())); |
| assertTrue("Temporary area should be empty", ArrayUtils.isEmpty(tmpPath.toFile().listFiles())); |
| |
| assertTrue("Binary should be present in staging area", stagingPath.resolve(defaultBinaryJar).toFile().exists()); |
| assertTrue("Source should be present in staging area", stagingPath.resolve(defaultSourceJar).toFile().exists()); |
| } |
| |
| @Test |
| public void testConcurrentRegistrationOfTheSameJar() throws Exception { |
| RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry(); |
| |
| final CountDownLatch latch1 = new CountDownLatch(1); |
| final CountDownLatch latch2 = new CountDownLatch(1); |
| |
| doAnswer(invocation -> { |
| String result = (String) invocation.callRealMethod(); |
| latch2.countDown(); |
| latch1.await(); |
| return result; |
| }) |
| .doCallRealMethod() |
| .doCallRealMethod() |
| .when(remoteFunctionRegistry).addToJars(anyString(), any(RemoteFunctionRegistry.Action.class)); |
| |
| |
| final String query = String.format("create function using jar '%s'", defaultBinaryJar); |
| |
| Thread thread = new Thread(new SimpleQueryRunner(query)); |
| thread.start(); |
| latch2.await(); |
| |
| try { |
| String summary = "Jar with %s name is used. Action: REGISTRATION"; |
| |
| testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, String.format(summary, defaultBinaryJar)) |
| .go(); |
| |
| testBuilder() |
| .sqlQuery("drop function using jar '%s'", defaultBinaryJar) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, String.format(summary, defaultBinaryJar)) |
| .go(); |
| |
| } finally { |
| latch1.countDown(); |
| thread.join(); |
| } |
| } |
| |
| @Test |
| public void testConcurrentRemoteRegistryUpdateWithDuplicates() throws Exception { |
| RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry(); |
| |
| final CountDownLatch latch1 = new CountDownLatch(1); |
| final CountDownLatch latch2 = new CountDownLatch(1); |
| final CountDownLatch latch3 = new CountDownLatch(1); |
| |
| doAnswer(invocation -> { |
| latch3.countDown(); |
| latch1.await(); |
| invocation.callRealMethod(); |
| latch2.countDown(); |
| return null; |
| }).doAnswer(invocation -> { |
| latch1.countDown(); |
| latch2.await(); |
| invocation.callRealMethod(); |
| return null; |
| }) |
| .when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| final String jar1 = defaultBinaryJar; |
| copyDefaultJarsToStagingArea(); |
| |
| final String copyJarName = "drill-custom-lower-copy"; |
| final String jar2 = buildAndCopyJarsToStagingArea(copyJarName, "**/CustomLowerFunction.java", null); |
| |
| final String query = "create function using jar '%s'"; |
| |
| Thread thread1 = new Thread(new TestBuilderRunner( |
| testBuilder() |
| .sqlQuery(query, jar1) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(true, |
| String.format("The following UDFs in jar %s have been registered:\n" + |
| "[custom_lower(VARCHAR-REQUIRED)]", jar1)) |
| )); |
| |
| Thread thread2 = new Thread(new TestBuilderRunner( |
| testBuilder() |
| .sqlQuery(query, jar2) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(false, |
| String.format("Found duplicated function in %s: custom_lower(VARCHAR-REQUIRED)", jar1)) |
| )); |
| |
| thread1.start(); |
| latch3.await(); |
| thread2.start(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| DataChangeVersion version = new DataChangeVersion(); |
| Registry registry = remoteFunctionRegistry.getRegistry(version); |
| assertEquals("Remote registry version should match", 2, version.getVersion()); |
| List<Jar> jarList = registry.getJarList(); |
| assertEquals("Only one jar should be registered", 1, jarList.size()); |
| assertEquals("Jar name should match", jar1, jarList.get(0).getName()); |
| |
| verify(remoteFunctionRegistry, times(2)).updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| } |
| |
| @Test |
| public void testConcurrentRemoteRegistryUpdateForDifferentJars() throws Exception { |
| RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry(); |
| final CountDownLatch latch1 = new CountDownLatch(1); |
| final CountDownLatch latch2 = new CountDownLatch(2); |
| |
| doAnswer(invocation -> { |
| latch2.countDown(); |
| latch1.await(); |
| invocation.callRealMethod(); |
| return null; |
| }) |
| .when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| |
| final String jar1 = defaultBinaryJar; |
| copyDefaultJarsToStagingArea(); |
| |
| final String upperJarName = "drill-custom-upper"; |
| final String jar2 = buildAndCopyJarsToStagingArea(upperJarName, "**/CustomUpperFunction.java", null); |
| |
| final String query = "create function using jar '%s'"; |
| |
| Thread thread1 = new Thread(new TestBuilderRunner( |
| testBuilder() |
| .sqlQuery(query, jar1) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(true, |
| String.format("The following UDFs in jar %s have been registered:\n" + |
| "[custom_lower(VARCHAR-REQUIRED)]", jar1)) |
| )); |
| |
| |
| Thread thread2 = new Thread(new TestBuilderRunner( |
| testBuilder() |
| .sqlQuery(query, jar2) |
| .unOrdered() |
| .baselineColumns("ok", "summary") |
| .baselineValues(true, String.format("The following UDFs in jar %s have been registered:\n" + |
| "[custom_upper(VARCHAR-REQUIRED)]", jar2)) |
| )); |
| |
| thread1.start(); |
| thread2.start(); |
| |
| latch2.await(); |
| latch1.countDown(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| DataChangeVersion version = new DataChangeVersion(); |
| Registry registry = remoteFunctionRegistry.getRegistry(version); |
| assertEquals("Remote registry version should match", 3, version.getVersion()); |
| |
| List<Jar> actualJars = registry.getJarList(); |
| List<String> expectedJars = Lists.newArrayList(jar1, jar2); |
| |
| assertEquals("Only one jar should be registered", 2, actualJars.size()); |
| for (Jar jar : actualJars) { |
| assertTrue("Jar should be present in remote function registry", expectedJars.contains(jar.getName())); |
| } |
| |
| verify(remoteFunctionRegistry, times(3)).updateRegistry(any(Registry.class), any(DataChangeVersion.class)); |
| } |
| |
| @Test |
| public void testLazyInitConcurrent() throws Exception { |
| FunctionImplementationRegistry functionImplementationRegistry = spyFunctionImplementationRegistry(); |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| |
| final CountDownLatch latch1 = new CountDownLatch(1); |
| final CountDownLatch latch2 = new CountDownLatch(1); |
| |
| final String query = "select custom_lower('A') from (values(1))"; |
| |
| doAnswer(invocation -> { |
| latch1.await(); |
| boolean result = (boolean) invocation.callRealMethod(); |
| assertTrue("syncWithRemoteRegistry() should return true", result); |
| latch2.countDown(); |
| return true; |
| }) |
| .doAnswer(invocation -> { |
| latch1.countDown(); |
| latch2.await(); |
| boolean result = (boolean) invocation.callRealMethod(); |
| assertTrue("syncWithRemoteRegistry() should return true", result); |
| return true; |
| }) |
| .when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt()); |
| |
| SimpleQueryRunner simpleQueryRunner = new SimpleQueryRunner(query); |
| Thread thread1 = new Thread(simpleQueryRunner); |
| Thread thread2 = new Thread(simpleQueryRunner); |
| |
| thread1.start(); |
| thread2.start(); |
| |
| thread1.join(); |
| thread2.join(); |
| |
| verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyInt()); |
| LocalFunctionRegistry localFunctionRegistry = (LocalFunctionRegistry)FieldUtils.readField( |
| functionImplementationRegistry, "localFunctionRegistry", true); |
| assertEquals("Sync function registry version should match", 2, localFunctionRegistry.getVersion()); |
| } |
| |
| @Test |
| public void testLazyInitNoReload() throws Exception { |
| FunctionImplementationRegistry functionImplementationRegistry = spyFunctionImplementationRegistry(); |
| copyDefaultJarsToStagingArea(); |
| test("create function using jar '%s'", defaultBinaryJar); |
| |
| doAnswer(invocation -> { |
| boolean result = (boolean) invocation.callRealMethod(); |
| assertTrue("syncWithRemoteRegistry() should return true", result); |
| return true; |
| }) |
| .doAnswer(invocation -> { |
| boolean result = (boolean) invocation.callRealMethod(); |
| assertFalse("syncWithRemoteRegistry() should return false", result); |
| return false; |
| }) |
| .when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt()); |
| |
| test("select custom_lower('A') from (values(1))"); |
| |
| try { |
| test("select unknown_lower('A') from (values(1))"); |
| fail(); |
| } catch (UserRemoteException e){ |
| assertThat(e.getMessage(), containsString("No match found for function signature unknown_lower(<CHARACTER>)")); |
| } |
| |
| verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyInt()); |
| LocalFunctionRegistry localFunctionRegistry = (LocalFunctionRegistry)FieldUtils.readField( |
| functionImplementationRegistry, "localFunctionRegistry", true); |
| assertEquals("Sync function registry version should match", 2, localFunctionRegistry.getVersion()); |
| } |
| |
| private static String buildJars(String jarName, String includeFiles, String includeResources) { |
| return jarBuilder.build(jarName, buildDirectory.getAbsolutePath(), includeFiles, includeResources); |
| } |
| |
| private void copyDefaultJarsToStagingArea() throws IOException { |
| copyJarsToStagingArea(jarsDir.toPath(), defaultBinaryJar, defaultSourceJar); |
| } |
| |
| private String buildAndCopyJarsToStagingArea(String jarName, String includeFiles, String includeResources) throws IOException { |
| String binaryJar = buildJars(jarName, includeFiles, includeResources); |
| copyJarsToStagingArea(buildDirectory.toPath(), binaryJar, JarUtil.getSourceName(binaryJar)); |
| return binaryJar; |
| } |
| |
| private void copyJarsToStagingArea(Path src, String binaryName, String sourceName) throws IOException { |
| RemoteFunctionRegistry remoteFunctionRegistry = getDrillbitContext().getRemoteFunctionRegistry(); |
| |
| final Path path = hadoopToJavaPath(remoteFunctionRegistry.getStagingArea()); |
| |
| copyJar(src, path, binaryName); |
| copyJar(src, path, sourceName); |
| } |
| |
| private void copyJar(Path src, Path dest, String name) throws IOException { |
| final File destFile = dest.resolve(name).toFile(); |
| FileUtils.deleteQuietly(destFile); |
| FileUtils.copyFile(src.resolve(name).toFile(), destFile); |
| } |
| |
| private RemoteFunctionRegistry spyRemoteFunctionRegistry() throws IllegalAccessException { |
| FunctionImplementationRegistry functionImplementationRegistry = |
| getDrillbitContext().getFunctionImplementationRegistry(); |
| RemoteFunctionRegistry remoteFunctionRegistry = functionImplementationRegistry.getRemoteFunctionRegistry(); |
| RemoteFunctionRegistry spy = spy(remoteFunctionRegistry); |
| FieldUtils.writeField(functionImplementationRegistry, "remoteFunctionRegistry", spy, true); |
| return spy; |
| } |
| |
| private FunctionImplementationRegistry spyFunctionImplementationRegistry() throws IllegalAccessException { |
| DrillbitContext drillbitContext = getDrillbitContext(); |
| FunctionImplementationRegistry spy = spy(drillbitContext.getFunctionImplementationRegistry()); |
| FieldUtils.writeField(drillbitContext, "functionRegistry", spy, true); |
| return spy; |
| } |
| |
| private static class SimpleQueryRunner implements Runnable { |
| |
| private final String query; |
| |
| SimpleQueryRunner(String query) { |
| this.query = query; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| test(query); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| private static class TestBuilderRunner implements Runnable { |
| |
| private final TestBuilder testBuilder; |
| |
| TestBuilderRunner(TestBuilder testBuilder) { |
| this.testBuilder = testBuilder; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| testBuilder.go(); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| } |