blob: 155676850507ea413088e40680d22407d4ff6dd0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.udf.dynamic;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.categories.SqlFunctionTest;
import org.apache.drill.common.config.ConfigConstants;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.fn.registry.LocalFunctionRegistry;
import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
import org.apache.drill.exec.proto.UserBitShared.Jar;
import org.apache.drill.exec.proto.UserBitShared.Registry;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.exec.util.JarUtil;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.TestBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static org.apache.drill.test.HadoopUtils.hadoopToJavaPath;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@Category({SlowTest.class, SqlFunctionTest.class})
public class TestDynamicUDFSupport extends BaseTestQuery {
private static final String DEFAULT_JAR_NAME = "drill-custom-lower";
private static URI fsUri;
private static File jarsDir;
private static File buildDirectory;
private static JarBuilder jarBuilder;
private static String defaultBinaryJar;
private static String defaultSourceJar;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void buildAndStoreDefaultJars() throws IOException {
jarsDir = dirTestWatcher.makeSubDir(Paths.get("jars"));
buildDirectory = dirTestWatcher.makeSubDir(Paths.get("drill-udf"));
jarBuilder = new JarBuilder("src/test/resources/drill-udf");
defaultBinaryJar = buildJars(DEFAULT_JAR_NAME, "**/CustomLowerFunction.java", null);
defaultSourceJar = JarUtil.getSourceName(defaultBinaryJar);
FileUtils.copyFileToDirectory(new File(buildDirectory, defaultBinaryJar), jarsDir);
FileUtils.copyFileToDirectory(new File(buildDirectory, defaultSourceJar), jarsDir);
}
@Before
public void setupNewDrillbit() throws Exception {
updateTestCluster(1, config);
fsUri = getLocalFileSystem().getUri();
}
@After
public void cleanup() throws Exception {
closeClient();
dirTestWatcher.clear();
}
@Test
public void testSyntax() throws Exception {
test("create function using jar 'jar_name.jar'");
test("drop function using jar 'jar_name.jar'");
}
@Test
public void testEnableDynamicSupport() throws Exception {
try {
test("alter system set `exec.udf.enable_dynamic_support` = true");
test("create function using jar 'jar_name.jar'");
test("drop function using jar 'jar_name.jar'");
} finally {
test("alter system reset `exec.udf.enable_dynamic_support`");
}
}
@Test
public void testDisableDynamicSupportCreate() throws Exception {
try {
test("alter system set `exec.udf.enable_dynamic_support` = false");
String query = "create function using jar 'jar_name.jar'";
thrown.expect(UserRemoteException.class);
thrown.expectMessage(containsString("Dynamic UDFs support is disabled."));
test(query);
} finally {
test("alter system reset `exec.udf.enable_dynamic_support`");
}
}
@Test
public void testDisableDynamicSupportDrop() throws Exception {
try {
test("alter system set `exec.udf.enable_dynamic_support` = false");
String query = "drop function using jar 'jar_name.jar'";
thrown.expect(UserRemoteException.class);
thrown.expectMessage(containsString("Dynamic UDFs support is disabled."));
test(query);
} finally {
test("alter system reset `exec.udf.enable_dynamic_support`");
}
}
@Test
public void testAbsentBinaryInStaging() throws Exception {
Path staging = hadoopToJavaPath(getDrillbitContext().getRemoteFunctionRegistry().getStagingArea());
String summary = String.format("File %s does not exist on file system %s",
staging.resolve(defaultBinaryJar).toUri().getPath(), fsUri);
testBuilder()
.sqlQuery("create function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, summary)
.go();
}
@Test
public void testAbsentSourceInStaging() throws Exception {
Path staging = hadoopToJavaPath(getDrillbitContext().getRemoteFunctionRegistry().getStagingArea());
copyJar(jarsDir.toPath(), staging, defaultBinaryJar);
String summary = String.format("File %s does not exist on file system %s",
staging.resolve(defaultSourceJar).toUri().getPath(), fsUri);
testBuilder()
.sqlQuery("create function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, summary)
.go();
}
@Test
public void testJarWithoutMarkerFile() throws Exception {
String jarName = "drill-no-marker";
String jar = buildAndCopyJarsToStagingArea(jarName, null, "**/dummy.conf");
String summary = "Marker file %s is missing in %s";
testBuilder()
.sqlQuery("create function using jar '%s'", jar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary,
ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jar))
.go();
}
@Test
public void testJarWithoutFunctions() throws Exception {
String jarName = "drill-no-functions";
String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomLowerDummyFunction.java", null);
String summary = "Jar %s does not contain functions";
testBuilder()
.sqlQuery("create function using jar '%s'", jar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary, jar))
.go();
}
@Test
public void testSuccessfulRegistration() throws Exception {
copyDefaultJarsToStagingArea();
String summary = "The following UDFs in jar %s have been registered:\n" +
"[custom_lower(VARCHAR-REQUIRED)]";
testBuilder()
.sqlQuery("create function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(true, String.format(summary, defaultBinaryJar))
.go();
RemoteFunctionRegistry remoteFunctionRegistry = getDrillbitContext().getRemoteFunctionRegistry();
FileSystem fs = remoteFunctionRegistry.getFs();
assertFalse("Staging area should be empty", fs.listFiles(remoteFunctionRegistry.getStagingArea(), false).hasNext());
assertFalse("Temporary area should be empty", fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext());
Path path = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea());
assertTrue("Binary should be present in registry area",
path.resolve(defaultBinaryJar).toFile().exists());
assertTrue("Source should be present in registry area",
path.resolve(defaultBinaryJar).toFile().exists());
Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion());
assertEquals("Registry should contain one jar", registry.getJarList().size(), 1);
assertEquals(registry.getJar(0).getName(), defaultBinaryJar);
}
@Test
public void testDuplicatedJarInRemoteRegistry() throws Exception {
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
copyDefaultJarsToStagingArea();
String summary = "Jar with %s name has been already registered";
testBuilder()
.sqlQuery("create function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary, defaultBinaryJar))
.go();
}
@Test
public void testDuplicatedJarInLocalRegistry() throws Exception {
String jarName = "drill-custom-upper";
String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomUpperFunction.java", null);
test("create function using jar '%s'", jar);
test("select custom_upper('A') from (values(1))");
copyJarsToStagingArea(buildDirectory.toPath(), jar,JarUtil.getSourceName(jar));
String summary = "Jar with %s name has been already registered";
testBuilder()
.sqlQuery("create function using jar '%s'", jar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary, jar))
.go();
}
@Test
public void testDuplicatedFunctionsInRemoteRegistry() throws Exception {
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
String jarName = "drill-custom-lower-copy";
String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomLowerFunction.java", null);
String summary = "Found duplicated function in %s: custom_lower(VARCHAR-REQUIRED)";
testBuilder()
.sqlQuery("create function using jar '%s'", jar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary, defaultBinaryJar))
.go();
}
@Test
public void testDuplicatedFunctionsInLocalRegistry() throws Exception {
String jarName = "drill-lower";
String jar = buildAndCopyJarsToStagingArea(jarName, "**/LowerFunction.java", null);
String summary = "Found duplicated function in %s: lower(VARCHAR-REQUIRED)";
testBuilder()
.sqlQuery("create function using jar '%s'", jar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary, LocalFunctionRegistry.BUILT_IN))
.go();
}
@Test
public void testSuccessfulRegistrationAfterSeveralRetryAttempts() throws Exception {
RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry();
Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea());
Path stagingPath = hadoopToJavaPath(remoteFunctionRegistry.getStagingArea());
Path tmpPath = hadoopToJavaPath(remoteFunctionRegistry.getTmpArea());
copyDefaultJarsToStagingArea();
doThrow(new VersionMismatchException("Version mismatch detected", 1))
.doThrow(new VersionMismatchException("Version mismatch detected", 1))
.doCallRealMethod()
.when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class));
String summary = "The following UDFs in jar %s have been registered:\n" +
"[custom_lower(VARCHAR-REQUIRED)]";
testBuilder()
.sqlQuery("create function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(true, String.format(summary, defaultBinaryJar))
.go();
verify(remoteFunctionRegistry, times(3))
.updateRegistry(any(Registry.class), any(DataChangeVersion.class));
assertTrue("Staging area should be empty", ArrayUtils.isEmpty(stagingPath.toFile().listFiles()));
assertTrue("Temporary area should be empty", ArrayUtils.isEmpty(tmpPath.toFile().listFiles()));
assertTrue("Binary should be present in registry area",
registryPath.resolve(defaultBinaryJar).toFile().exists());
assertTrue("Source should be present in registry area",
registryPath.resolve(defaultSourceJar).toFile().exists());
Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion());
assertEquals("Registry should contain one jar", registry.getJarList().size(), 1);
assertEquals(registry.getJar(0).getName(), defaultBinaryJar);
}
@Test
public void testSuccessfulUnregistrationAfterSeveralRetryAttempts() throws Exception {
RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry();
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
reset(remoteFunctionRegistry);
doThrow(new VersionMismatchException("Version mismatch detected", 1))
.doThrow(new VersionMismatchException("Version mismatch detected", 1))
.doCallRealMethod()
.when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class));
String summary = "The following UDFs in jar %s have been unregistered:\n" +
"[custom_lower(VARCHAR-REQUIRED)]";
testBuilder()
.sqlQuery("drop function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(true, String.format(summary, defaultBinaryJar))
.go();
verify(remoteFunctionRegistry, times(3))
.updateRegistry(any(Registry.class), any(DataChangeVersion.class));
FileSystem fs = remoteFunctionRegistry.getFs();
assertFalse("Registry area should be empty", fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext());
assertEquals("Registry should be empty",
remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0);
}
@Test
public void testExceedRetryAttemptsDuringRegistration() throws Exception {
RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry();
Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea());
Path stagingPath = hadoopToJavaPath(remoteFunctionRegistry.getStagingArea());
Path tmpPath = hadoopToJavaPath(remoteFunctionRegistry.getTmpArea());
copyDefaultJarsToStagingArea();
doThrow(new VersionMismatchException("Version mismatch detected", 1))
.when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class));
String summary = "Failed to update remote function registry. Exceeded retry attempts limit.";
testBuilder()
.sqlQuery("create function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, summary)
.go();
verify(remoteFunctionRegistry, times(remoteFunctionRegistry.getRetryAttempts() + 1))
.updateRegistry(any(Registry.class), any(DataChangeVersion.class));
assertTrue("Binary should be present in staging area",
stagingPath.resolve(defaultBinaryJar).toFile().exists());
assertTrue("Source should be present in staging area",
stagingPath.resolve(defaultSourceJar).toFile().exists());
assertTrue("Registry area should be empty", ArrayUtils.isEmpty(registryPath.toFile().listFiles()));
assertTrue("Temporary area should be empty", ArrayUtils.isEmpty(tmpPath.toFile().listFiles()));
assertEquals("Registry should be empty",
remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0);
}
@Test
public void testExceedRetryAttemptsDuringUnregistration() throws Exception {
RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry();
Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea());
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
reset(remoteFunctionRegistry);
doThrow(new VersionMismatchException("Version mismatch detected", 1))
.when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class));
String summary = "Failed to update remote function registry. Exceeded retry attempts limit.";
testBuilder()
.sqlQuery("drop function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, summary)
.go();
verify(remoteFunctionRegistry, times(remoteFunctionRegistry.getRetryAttempts() + 1))
.updateRegistry(any(Registry.class), any(DataChangeVersion.class));
assertTrue("Binary should be present in registry area",
registryPath.resolve(defaultBinaryJar).toFile().exists());
assertTrue("Source should be present in registry area",
registryPath.resolve(defaultSourceJar).toFile().exists());
Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion());
assertEquals("Registry should contain one jar", registry.getJarList().size(), 1);
assertEquals(registry.getJar(0).getName(), defaultBinaryJar);
}
@Test
public void testLazyInit() throws Exception {
thrown.expect(UserRemoteException.class);
thrown.expectMessage(containsString("No match found for function signature custom_lower(<CHARACTER>)"));
test("select custom_lower('A') from (values(1))");
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
testBuilder()
.sqlQuery("select custom_lower('A') as res from (values(1))")
.unOrdered()
.baselineColumns("res")
.baselineValues("a")
.go();
Path localUdfDirPath = hadoopToJavaPath((org.apache.hadoop.fs.Path)FieldUtils.readField(
getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir", true));
assertTrue("Binary should exist in local udf directory",
localUdfDirPath.resolve(defaultBinaryJar).toFile().exists());
assertTrue("Source should exist in local udf directory",
localUdfDirPath.resolve(defaultSourceJar).toFile().exists());
}
@Test
public void testLazyInitWhenDynamicUdfSupportIsDisabled() throws Exception {
thrown.expect(UserRemoteException.class);
thrown.expectMessage(containsString("No match found for function signature custom_lower(<CHARACTER>)"));
test("select custom_lower('A') from (values(1))");
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
try {
testBuilder()
.sqlQuery("select custom_lower('A') as res from (values(1))")
.optionSettingQueriesForTestQuery("alter system set `exec.udf.enable_dynamic_support` = false")
.unOrdered()
.baselineColumns("res")
.baselineValues("a")
.go();
} finally {
test("alter system reset `exec.udf.enable_dynamic_support`");
}
}
@Test
public void testOverloadedFunctionPlanningStage() throws Exception {
String jarName = "drill-custom-abs";
String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomAbsFunction.java", null);
test("create function using jar '%s'", jar);
testBuilder()
.sqlQuery("select abs('A', 'A') as res from (values(1))")
.unOrdered()
.baselineColumns("res")
.baselineValues("ABS was overloaded. Input: A, A")
.go();
}
@Test
public void testOverloadedFunctionExecutionStage() throws Exception {
String jarName = "drill-custom-log";
String jar = buildAndCopyJarsToStagingArea(jarName, "**/CustomLogFunction.java", null);
test("create function using jar '%s'", jar);
testBuilder()
.sqlQuery("select log('A') as res from (values(1))")
.unOrdered()
.baselineColumns("res")
.baselineValues("LOG was overloaded. Input: A")
.go();
}
@Test
public void testDropFunction() throws Exception {
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
test("select custom_lower('A') from (values(1))");
Path localUdfDirPath = hadoopToJavaPath((org.apache.hadoop.fs.Path)FieldUtils.readField(
getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir", true));
assertTrue("Binary should exist in local udf directory",
localUdfDirPath.resolve(defaultBinaryJar).toFile().exists());
assertTrue("Source should exist in local udf directory",
localUdfDirPath.resolve(defaultSourceJar).toFile().exists());
String summary = "The following UDFs in jar %s have been unregistered:\n" +
"[custom_lower(VARCHAR-REQUIRED)]";
testBuilder()
.sqlQuery("drop function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(true, String.format(summary, defaultBinaryJar))
.go();
try {
test("select custom_lower('A') from (values(1))");
fail();
} catch (UserRemoteException e) {
assertTrue(e.getMessage().contains("No match found for function signature custom_lower(<CHARACTER>)"));
}
RemoteFunctionRegistry remoteFunctionRegistry = getDrillbitContext().getRemoteFunctionRegistry();
Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea());
assertEquals("Remote registry should be empty",
remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0);
assertFalse("Binary should not be present in registry area",
registryPath.resolve(defaultBinaryJar).toFile().exists());
assertFalse("Source should not be present in registry area",
registryPath.resolve(defaultSourceJar).toFile().exists());
assertFalse("Binary should not be present in local udf directory",
localUdfDirPath.resolve(defaultBinaryJar).toFile().exists());
assertFalse("Source should not be present in local udf directory",
localUdfDirPath.resolve(defaultSourceJar).toFile().exists());
}
@Test
public void testReRegisterTheSameJarWithDifferentContent() throws Exception {
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
testBuilder()
.sqlQuery("select custom_lower('A') as res from (values(1))")
.unOrdered()
.baselineColumns("res")
.baselineValues("a")
.go();
test("drop function using jar '%s'", defaultBinaryJar);
Thread.sleep(1000);
buildAndCopyJarsToStagingArea(DEFAULT_JAR_NAME, "**/CustomLowerFunctionV2.java", null);
test("create function using jar '%s'", defaultBinaryJar);
testBuilder()
.sqlQuery("select custom_lower('A') as res from (values(1))")
.unOrdered()
.baselineColumns("res")
.baselineValues("a_v2")
.go();
}
@Test
public void testDropAbsentJar() throws Exception {
String summary = "Jar %s is not registered in remote registry";
testBuilder()
.sqlQuery("drop function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary, defaultBinaryJar))
.go();
}
@Test
public void testRegistrationFailDuringRegistryUpdate() throws Exception {
RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry();
Path registryPath = hadoopToJavaPath(remoteFunctionRegistry.getRegistryArea());
Path stagingPath = hadoopToJavaPath(remoteFunctionRegistry.getStagingArea());
Path tmpPath = hadoopToJavaPath(remoteFunctionRegistry.getTmpArea());
final String errorMessage = "Failure during remote registry update.";
doAnswer(invocation -> {
assertTrue("Binary should be present in registry area",
registryPath.resolve(defaultBinaryJar).toFile().exists());
assertTrue("Source should be present in registry area",
registryPath.resolve(defaultSourceJar).toFile().exists());
throw new RuntimeException(errorMessage);
}).when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class));
copyDefaultJarsToStagingArea();
testBuilder()
.sqlQuery("create function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, errorMessage)
.go();
assertTrue("Registry area should be empty", ArrayUtils.isEmpty(registryPath.toFile().listFiles()));
assertTrue("Temporary area should be empty", ArrayUtils.isEmpty(tmpPath.toFile().listFiles()));
assertTrue("Binary should be present in staging area", stagingPath.resolve(defaultBinaryJar).toFile().exists());
assertTrue("Source should be present in staging area", stagingPath.resolve(defaultSourceJar).toFile().exists());
}
@Test
public void testConcurrentRegistrationOfTheSameJar() throws Exception {
RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry();
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
doAnswer(invocation -> {
String result = (String) invocation.callRealMethod();
latch2.countDown();
latch1.await();
return result;
})
.doCallRealMethod()
.doCallRealMethod()
.when(remoteFunctionRegistry).addToJars(anyString(), any(RemoteFunctionRegistry.Action.class));
final String query = String.format("create function using jar '%s'", defaultBinaryJar);
Thread thread = new Thread(new SimpleQueryRunner(query));
thread.start();
latch2.await();
try {
String summary = "Jar with %s name is used. Action: REGISTRATION";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary, defaultBinaryJar))
.go();
testBuilder()
.sqlQuery("drop function using jar '%s'", defaultBinaryJar)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary, defaultBinaryJar))
.go();
} finally {
latch1.countDown();
thread.join();
}
}
@Test
public void testConcurrentRemoteRegistryUpdateWithDuplicates() throws Exception {
RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry();
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
final CountDownLatch latch3 = new CountDownLatch(1);
doAnswer(invocation -> {
latch3.countDown();
latch1.await();
invocation.callRealMethod();
latch2.countDown();
return null;
}).doAnswer(invocation -> {
latch1.countDown();
latch2.await();
invocation.callRealMethod();
return null;
})
.when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class));
final String jar1 = defaultBinaryJar;
copyDefaultJarsToStagingArea();
final String copyJarName = "drill-custom-lower-copy";
final String jar2 = buildAndCopyJarsToStagingArea(copyJarName, "**/CustomLowerFunction.java", null);
final String query = "create function using jar '%s'";
Thread thread1 = new Thread(new TestBuilderRunner(
testBuilder()
.sqlQuery(query, jar1)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(true,
String.format("The following UDFs in jar %s have been registered:\n" +
"[custom_lower(VARCHAR-REQUIRED)]", jar1))
));
Thread thread2 = new Thread(new TestBuilderRunner(
testBuilder()
.sqlQuery(query, jar2)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false,
String.format("Found duplicated function in %s: custom_lower(VARCHAR-REQUIRED)", jar1))
));
thread1.start();
latch3.await();
thread2.start();
thread1.join();
thread2.join();
DataChangeVersion version = new DataChangeVersion();
Registry registry = remoteFunctionRegistry.getRegistry(version);
assertEquals("Remote registry version should match", 2, version.getVersion());
List<Jar> jarList = registry.getJarList();
assertEquals("Only one jar should be registered", 1, jarList.size());
assertEquals("Jar name should match", jar1, jarList.get(0).getName());
verify(remoteFunctionRegistry, times(2)).updateRegistry(any(Registry.class), any(DataChangeVersion.class));
}
@Test
public void testConcurrentRemoteRegistryUpdateForDifferentJars() throws Exception {
RemoteFunctionRegistry remoteFunctionRegistry = spyRemoteFunctionRegistry();
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(2);
doAnswer(invocation -> {
latch2.countDown();
latch1.await();
invocation.callRealMethod();
return null;
})
.when(remoteFunctionRegistry).updateRegistry(any(Registry.class), any(DataChangeVersion.class));
final String jar1 = defaultBinaryJar;
copyDefaultJarsToStagingArea();
final String upperJarName = "drill-custom-upper";
final String jar2 = buildAndCopyJarsToStagingArea(upperJarName, "**/CustomUpperFunction.java", null);
final String query = "create function using jar '%s'";
Thread thread1 = new Thread(new TestBuilderRunner(
testBuilder()
.sqlQuery(query, jar1)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(true,
String.format("The following UDFs in jar %s have been registered:\n" +
"[custom_lower(VARCHAR-REQUIRED)]", jar1))
));
Thread thread2 = new Thread(new TestBuilderRunner(
testBuilder()
.sqlQuery(query, jar2)
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(true, String.format("The following UDFs in jar %s have been registered:\n" +
"[custom_upper(VARCHAR-REQUIRED)]", jar2))
));
thread1.start();
thread2.start();
latch2.await();
latch1.countDown();
thread1.join();
thread2.join();
DataChangeVersion version = new DataChangeVersion();
Registry registry = remoteFunctionRegistry.getRegistry(version);
assertEquals("Remote registry version should match", 3, version.getVersion());
List<Jar> actualJars = registry.getJarList();
List<String> expectedJars = Lists.newArrayList(jar1, jar2);
assertEquals("Only one jar should be registered", 2, actualJars.size());
for (Jar jar : actualJars) {
assertTrue("Jar should be present in remote function registry", expectedJars.contains(jar.getName()));
}
verify(remoteFunctionRegistry, times(3)).updateRegistry(any(Registry.class), any(DataChangeVersion.class));
}
@Test
public void testLazyInitConcurrent() throws Exception {
FunctionImplementationRegistry functionImplementationRegistry = spyFunctionImplementationRegistry();
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
final String query = "select custom_lower('A') from (values(1))";
doAnswer(invocation -> {
latch1.await();
boolean result = (boolean) invocation.callRealMethod();
assertTrue("syncWithRemoteRegistry() should return true", result);
latch2.countDown();
return true;
})
.doAnswer(invocation -> {
latch1.countDown();
latch2.await();
boolean result = (boolean) invocation.callRealMethod();
assertTrue("syncWithRemoteRegistry() should return true", result);
return true;
})
.when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt());
SimpleQueryRunner simpleQueryRunner = new SimpleQueryRunner(query);
Thread thread1 = new Thread(simpleQueryRunner);
Thread thread2 = new Thread(simpleQueryRunner);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyInt());
LocalFunctionRegistry localFunctionRegistry = (LocalFunctionRegistry)FieldUtils.readField(
functionImplementationRegistry, "localFunctionRegistry", true);
assertEquals("Sync function registry version should match", 2, localFunctionRegistry.getVersion());
}
@Test
public void testLazyInitNoReload() throws Exception {
FunctionImplementationRegistry functionImplementationRegistry = spyFunctionImplementationRegistry();
copyDefaultJarsToStagingArea();
test("create function using jar '%s'", defaultBinaryJar);
doAnswer(invocation -> {
boolean result = (boolean) invocation.callRealMethod();
assertTrue("syncWithRemoteRegistry() should return true", result);
return true;
})
.doAnswer(invocation -> {
boolean result = (boolean) invocation.callRealMethod();
assertFalse("syncWithRemoteRegistry() should return false", result);
return false;
})
.when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt());
test("select custom_lower('A') from (values(1))");
try {
test("select unknown_lower('A') from (values(1))");
fail();
} catch (UserRemoteException e){
assertThat(e.getMessage(), containsString("No match found for function signature unknown_lower(<CHARACTER>)"));
}
verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyInt());
LocalFunctionRegistry localFunctionRegistry = (LocalFunctionRegistry)FieldUtils.readField(
functionImplementationRegistry, "localFunctionRegistry", true);
assertEquals("Sync function registry version should match", 2, localFunctionRegistry.getVersion());
}
private static String buildJars(String jarName, String includeFiles, String includeResources) {
return jarBuilder.build(jarName, buildDirectory.getAbsolutePath(), includeFiles, includeResources);
}
private void copyDefaultJarsToStagingArea() throws IOException {
copyJarsToStagingArea(jarsDir.toPath(), defaultBinaryJar, defaultSourceJar);
}
private String buildAndCopyJarsToStagingArea(String jarName, String includeFiles, String includeResources) throws IOException {
String binaryJar = buildJars(jarName, includeFiles, includeResources);
copyJarsToStagingArea(buildDirectory.toPath(), binaryJar, JarUtil.getSourceName(binaryJar));
return binaryJar;
}
private void copyJarsToStagingArea(Path src, String binaryName, String sourceName) throws IOException {
RemoteFunctionRegistry remoteFunctionRegistry = getDrillbitContext().getRemoteFunctionRegistry();
final Path path = hadoopToJavaPath(remoteFunctionRegistry.getStagingArea());
copyJar(src, path, binaryName);
copyJar(src, path, sourceName);
}
private void copyJar(Path src, Path dest, String name) throws IOException {
final File destFile = dest.resolve(name).toFile();
FileUtils.deleteQuietly(destFile);
FileUtils.copyFile(src.resolve(name).toFile(), destFile);
}
private RemoteFunctionRegistry spyRemoteFunctionRegistry() throws IllegalAccessException {
FunctionImplementationRegistry functionImplementationRegistry =
getDrillbitContext().getFunctionImplementationRegistry();
RemoteFunctionRegistry remoteFunctionRegistry = functionImplementationRegistry.getRemoteFunctionRegistry();
RemoteFunctionRegistry spy = spy(remoteFunctionRegistry);
FieldUtils.writeField(functionImplementationRegistry, "remoteFunctionRegistry", spy, true);
return spy;
}
private FunctionImplementationRegistry spyFunctionImplementationRegistry() throws IllegalAccessException {
DrillbitContext drillbitContext = getDrillbitContext();
FunctionImplementationRegistry spy = spy(drillbitContext.getFunctionImplementationRegistry());
FieldUtils.writeField(drillbitContext, "functionRegistry", spy, true);
return spy;
}
private static class SimpleQueryRunner implements Runnable {
private final String query;
SimpleQueryRunner(String query) {
this.query = query;
}
@Override
public void run() {
try {
test(query);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private static class TestBuilderRunner implements Runnable {
private final TestBuilder testBuilder;
TestBuilderRunner(TestBuilder testBuilder) {
this.testBuilder = testBuilder;
}
@Override
public void run() {
try {
testBuilder.go();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}