blob: 0c9fd7905f0f8f3204e3a086d63efaeee2b1a46b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql.handlers;
import com.google.common.collect.Lists;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.io.FileUtils;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.util.DrillFileUtils;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.FunctionValidationException;
import org.apache.drill.exec.exception.JarValidationException;
import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.parser.SqlCreateFunction;
import org.apache.drill.exec.proto.UserBitShared.Jar;
import org.apache.drill.exec.proto.UserBitShared.Registry;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.exec.util.JarUtil;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
public class CreateFunctionHandler extends DefaultSqlHandler {
private static Logger logger = LoggerFactory.getLogger(CreateFunctionHandler.class);
public CreateFunctionHandler(SqlHandlerConfig config) {
super(config);
}
/**
* Registers UDFs dynamically. Process consists of several steps:
* <ol>
* <li>Registering jar in jar registry to ensure that several jars with the same name is not registered.</li>
* <li>Binary and source jars validation and back up.</li>
* <li>Validation against local function registry.</li>
* <li>Validation against remote function registry.</li>
* <li>Remote function registry update.</li>
* <li>Copying of jars to registry area and clean up.</li>
* </ol>
*
* UDFs registration is allowed only if dynamic UDFs support is enabled.
*
* @return - Single row indicating list of registered UDFs, or error message otherwise.
*/
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException {
if (!context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
throw UserException.validationError()
.message("Dynamic UDFs support is disabled.")
.build(logger);
}
RemoteFunctionRegistry remoteRegistry = context.getRemoteFunctionRegistry();
JarManager jarManager = new JarManager(sqlNode, remoteRegistry);
boolean inProgress = false;
try {
final String action = remoteRegistry.addToJars(jarManager.getBinaryName(), RemoteFunctionRegistry.Action.REGISTRATION);
if (!(inProgress = action == null)) {
return DirectPlan.createDirectPlan(context, false,
String.format("Jar with %s name is used. Action: %s", jarManager.getBinaryName(), action));
}
jarManager.initRemoteBackup();
List<String> functions = validateAgainstLocalRegistry(jarManager, context.getFunctionRegistry());
initRemoteRegistration(functions, jarManager, remoteRegistry);
jarManager.deleteQuietlyFromStagingArea();
return DirectPlan.createDirectPlan(context, true,
String.format("The following UDFs in jar %s have been registered:\n%s", jarManager.getBinaryName(), functions));
} catch (Exception e) {
logger.error("Error during UDF registration", e);
return DirectPlan.createDirectPlan(context, false, e.getMessage());
} finally {
if (inProgress) {
remoteRegistry.removeFromJars(jarManager.getBinaryName());
}
jarManager.cleanUp();
}
}
/**
* Instantiates coping of binary to local file system
* and validates functions from this jar against local function registry.
*
* @param jarManager helps coping binary to local file system
* @param localFunctionRegistry instance of local function registry to instantiate local validation
* @return list of validated function signatures
* @throws IOException in case of problems during copying binary to local file system
* @throws FunctionValidationException in case duplicated function was found
*/
private List<String> validateAgainstLocalRegistry(JarManager jarManager,
FunctionImplementationRegistry localFunctionRegistry) throws IOException {
Path localBinary = jarManager.copyBinaryToLocal();
return localFunctionRegistry.validate(localBinary);
}
/**
* Validates jar and its functions against remote jars.
* First checks if there is no duplicate by jar name and then looks for duplicates among functions.
*
* @param remoteJars list of remote jars to validate against
* @param jarName jar name to be validated
* @param functions list of functions present in jar to be validated
* @throws JarValidationException in case of jar with the same name was found
* @throws FunctionValidationException in case duplicated function was found
*/
private void validateAgainstRemoteRegistry(List<Jar> remoteJars, String jarName, List<String> functions) {
for (Jar remoteJar : remoteJars) {
if (remoteJar.getName().equals(jarName)) {
throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName));
}
for (String remoteFunction : remoteJar.getFunctionSignatureList()) {
for (String func : functions) {
if (remoteFunction.equals(func)) {
throw new FunctionValidationException(
String.format("Found duplicated function in %s: %s", remoteJar.getName(), remoteFunction));
}
}
}
}
}
/**
* Instantiates remote registration. First gets remote function registry with version.
* Version is used to ensure that we update the same registry we validated against.
* Then validates against list of remote jars.
* If validation is successful, first copies jars to registry area and starts updating remote function registry.
* If during update {@link VersionMismatchException} was detected,
* attempts to repeat remote registration process till retry attempts exceeds the limit.
* If retry attempts number hits 0, throws exception that failed to update remote function registry.
* In case of any error, if jars have been already copied to registry area, they will be deleted.
*
* @param functions list of functions present in jar
* @param jarManager helper class for copying jars to registry area
* @param remoteRegistry remote function registry
* @throws IOException in case of problems with copying jars to registry area
*/
private void initRemoteRegistration(List<String> functions,
JarManager jarManager,
RemoteFunctionRegistry remoteRegistry) throws IOException {
int retryAttempts = remoteRegistry.getRetryAttempts();
boolean copyJars = true;
try {
while (retryAttempts >= 0) {
DataChangeVersion version = new DataChangeVersion();
List<Jar> remoteJars = remoteRegistry.getRegistry(version).getJarList();
validateAgainstRemoteRegistry(remoteJars, jarManager.getBinaryName(), functions);
if (copyJars) {
jarManager.copyToRegistryArea();
copyJars = false;
}
List<Jar> jars = Lists.newArrayList(remoteJars);
jars.add(Jar.newBuilder().setName(jarManager.getBinaryName()).addAllFunctionSignature(functions).build());
Registry updatedRegistry = Registry.newBuilder().addAllJar(jars).build();
try {
remoteRegistry.updateRegistry(updatedRegistry, version);
return;
} catch (VersionMismatchException ex) {
logger.debug("Failed to update function registry during registration, version mismatch was detected.", ex);
retryAttempts--;
}
}
throw new DrillRuntimeException("Failed to update remote function registry. Exceeded retry attempts limit.");
} catch (Exception e) {
if (!copyJars) {
jarManager.deleteQuietlyFromRegistryArea();
}
throw e;
}
}
/**
* Inner helper class that encapsulates logic for working with jars.
* During initialization it creates path to staging jar, local and remote temporary jars, registry jars.
* Is responsible for validation, copying and deletion actions.
*/
private class JarManager {
private final String binaryName;
private final FileSystem fs;
private final Path remoteTmpDir;
private final Path localTmpDir;
private final Path stagingBinary;
private final Path stagingSource;
private final Path tmpRemoteBinary;
private final Path tmpRemoteSource;
private final Path registryBinary;
private final Path registrySource;
JarManager(SqlNode sqlNode, RemoteFunctionRegistry remoteRegistry) throws ForemanSetupException {
SqlCreateFunction node = unwrap(sqlNode, SqlCreateFunction.class);
this.binaryName = ((SqlCharStringLiteral) node.getJar()).toValue();
String sourceName = JarUtil.getSourceName(binaryName);
this.stagingBinary = new Path(remoteRegistry.getStagingArea(), binaryName);
this.stagingSource = new Path(remoteRegistry.getStagingArea(), sourceName);
this.remoteTmpDir = new Path(remoteRegistry.getTmpArea(), UUID.randomUUID().toString());
this.tmpRemoteBinary = new Path(remoteTmpDir, binaryName);
this.tmpRemoteSource = new Path(remoteTmpDir, sourceName);
this.registryBinary = new Path(remoteRegistry.getRegistryArea(), binaryName);
this.registrySource = new Path(remoteRegistry.getRegistryArea(), sourceName);
this.localTmpDir = new Path(DrillFileUtils.createTempDir().toURI());
this.fs = remoteRegistry.getFs();
}
/**
* @return binary jar name
*/
String getBinaryName() {
return binaryName;
}
/**
* Validates that both binary and source jar are present in staging area,
* it is expected that binary and source have standard naming convention.
* Backs up both jars to unique folder in remote temporary area.
*
* @throws IOException in case of binary or source absence or problems during copying jars
*/
void initRemoteBackup() throws IOException {
checkPathExistence(stagingBinary);
checkPathExistence(stagingSource);
fs.mkdirs(remoteTmpDir);
FileUtil.copy(fs, stagingBinary, fs, tmpRemoteBinary, false, true, fs.getConf());
FileUtil.copy(fs, stagingSource, fs, tmpRemoteSource, false, true, fs.getConf());
}
/**
* Copies binary jar to unique folder on local file system.
* Source jar is not needed for local validation.
*
* @return path to local binary jar
* @throws IOException in case of problems during copying binary jar
*/
Path copyBinaryToLocal() throws IOException {
Path localBinary = new Path(localTmpDir, binaryName);
fs.copyToLocalFile(tmpRemoteBinary, localBinary);
return localBinary;
}
/**
* Copies binary and source jars to registry area,
* in case of {@link IOException} removes copied jar(-s) from registry area
*
* @throws IOException is re-thrown in case of problems during copying process
*/
void copyToRegistryArea() throws IOException {
FileUtil.copy(fs, tmpRemoteBinary, fs, registryBinary, false, true, fs.getConf());
try {
FileUtil.copy(fs, tmpRemoteSource, fs, registrySource, false, true, fs.getConf());
} catch (IOException e) {
deleteQuietly(registryBinary, false);
throw new IOException(e);
}
}
/**
* Deletes binary and sources jars from staging area, in case of problems, logs warning and proceeds.
*/
void deleteQuietlyFromStagingArea() {
deleteQuietly(stagingBinary, false);
deleteQuietly(stagingSource, false);
}
/**
* Deletes binary and sources jars from registry area, in case of problems, logs warning and proceeds.
*/
void deleteQuietlyFromRegistryArea() {
deleteQuietly(registryBinary, false);
deleteQuietly(registrySource, false);
}
/**
* Removes quietly remote and local unique folders in temporary directories.
*/
void cleanUp() {
FileUtils.deleteQuietly(new File(localTmpDir.toUri()));
deleteQuietly(remoteTmpDir, true);
}
/**
* Checks if passed path exists on predefined file system.
*
* @param path path to be checked
* @throws IOException if path does not exist
*/
private void checkPathExistence(Path path) throws IOException {
if (!fs.exists(path)) {
throw new IOException(String.format("File %s does not exist on file system %s",
path.toUri().getPath(), fs.getUri()));
}
}
/**
* Deletes quietly file or directory, in case of errors, logs warning and proceeds.
*
* @param path path to file or directory
* @param isDirectory set to true if we need to delete a directory
*/
private void deleteQuietly(Path path, boolean isDirectory) {
try {
fs.delete(path, isDirectory);
} catch (IOException e) {
logger.warn(String.format("Error during deletion [%s]", path.toUri().getPath()), e);
}
}
}
}