blob: b7f5aeafd298067ceb37a0ce98e650859c64757a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Base64;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FunctionType;
import org.apache.hadoop.hive.metastore.api.ResourceType;
import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.Function.CompareMode;
import org.apache.impala.catalog.ScalarFunction;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.JniUtil;
import org.apache.impala.hive.executor.UdfExecutor;
import org.apache.impala.thrift.TFunction;
import org.apache.impala.thrift.TFunctionCategory;
import org.apache.log4j.Logger;
import org.apache.thrift.protocol.TCompactProtocol;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public abstract class FunctionUtils {
public static final Logger LOG = Logger.getLogger(FunctionUtils.class);
public static final FunctionResolutionOrder FUNCTION_RESOLUTION_ORDER =
new FunctionResolutionOrder();
/**
* Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF
* class referred to by the given Java function. This method copies the UDF Jar
* referenced by "function" to a temporary file in localLibraryPath_ and loads it
* into the jvm. Then we scan all the methods in the class using reflection and extract
* those methods and create corresponding Impala functions. Currently Impala supports
* only "JAR" files for symbols and also a single Jar containing all the dependent
* classes rather than a set of Jar files.
*/
public static List<Function> extractFunctions(String db,
org.apache.hadoop.hive.metastore.api.Function function,
String localLibPath)
throws ImpalaRuntimeException{
List<Function> result = Lists.newArrayList();
List<String> addedSignatures = Lists.newArrayList();
StringBuilder warnMessage = new StringBuilder();
if (!FunctionUtils.isFunctionCompatible(function, warnMessage)) {
LOG.warn("Skipping load of incompatible function: " +
function.getFunctionName() + ". " + warnMessage.toString());
return result;
}
String jarUri = function.getResourceUris().get(0).getUri();
Class<?> udfClass = null;
Path localJarPath = null;
try {
// TODO(todd): cache these jars based on the mtime and file ID of the
// remote JAR? Can we share a cache with the backend?
localJarPath = new Path("file://" + localLibPath,
UUID.randomUUID().toString() + ".jar");
try {
FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath);
} catch (IOException e) {
String errorMsg = "Error loading Java function: " + db + "." +
function.getFunctionName() + ". Couldn't copy " + jarUri +
" to local path: " + localJarPath.toString();
LOG.error(errorMsg, e);
throw new ImpalaRuntimeException(errorMsg);
}
URL[] classLoaderUrls = new URL[] {new URL(localJarPath.toString())};
try (URLClassLoader urlClassLoader = new URLClassLoader(classLoaderUrls)) {
udfClass = urlClassLoader.loadClass(function.getClassName());
// Check if the class is of UDF type. Currently we don't support other functions
// TODO: Remove this once we support Java UDAF/UDTF
if (org.apache.hadoop.hive.ql.exec.FunctionUtils.getUDFClassType(udfClass) !=
org.apache.hadoop.hive.ql.exec.FunctionUtils.UDFClassType.UDF) {
LOG.warn("Ignoring load of incompatible Java function: " +
function.getFunctionName() + " as " +
org.apache.hadoop.hive.ql.exec.FunctionUtils.getUDFClassType(udfClass)
+ " is not a supported type. Only UDFs are supported");
return result;
}
// Load each method in the UDF class and create the corresponding Impala Function
// object.
for (Method m: udfClass.getMethods()) {
if (!m.getName().equals(UdfExecutor.UDF_FUNCTION_NAME)) continue;
Function fn = ScalarFunction.fromHiveFunction(db,
function.getFunctionName(), function.getClassName(),
m.getParameterTypes(), m.getReturnType(), jarUri);
if (fn == null) {
LOG.warn("Ignoring incompatible method: " + m.toString() + " during load of "
+ "Hive UDF:" + function.getFunctionName() + " from " + udfClass);
continue;
}
if (!addedSignatures.contains(fn.signatureString())) {
result.add(fn);
addedSignatures.add(fn.signatureString());
}
}
}
} catch (ClassNotFoundException c) {
String errorMsg = "Error loading Java function: " + db + "." +
function.getFunctionName() + ". Symbol class " + function.getClassName() +
" not found in Jar: " + jarUri;
LOG.error(errorMsg);
throw new ImpalaRuntimeException(errorMsg, c);
} catch (Exception e) {
LOG.error("Skipping function load: " + function.getFunctionName(), e);
throw new ImpalaRuntimeException("Error extracting functions", e);
} catch (LinkageError e) {
String errorMsg = "Error resolving dependencies for Java function: " + db + "." +
function.getFunctionName();
LOG.error(errorMsg);
throw new ImpalaRuntimeException(errorMsg, e);
} finally {
if (localJarPath != null) FileSystemUtil.deleteIfExists(localJarPath);
}
return result;
}
public static List<Function> deserializeNativeFunctionsFromDbParams(
Map<String, String> dbParams) {
List<Function> results = Lists.newArrayList();
TCompactProtocol.Factory protocolFactory = new TCompactProtocol.Factory();
for (Map.Entry<String, String> entry: dbParams.entrySet()) {
if (!entry.getKey().startsWith(Db.FUNCTION_INDEX_PREFIX)) continue;
try {
TFunction fn = new TFunction();
JniUtil.deserializeThrift(protocolFactory, fn,
Base64.getDecoder().decode(entry.getValue()));
results.add(Function.fromThrift(fn));
} catch (ImpalaException e) {
LOG.error("Encountered an error during function load: key=" +
entry.getKey() + ",continuing", e);
}
}
return results;
}
/**
* Checks if the Hive function 'fn' is Impala compatible. A function is Impala
* compatible iff
*
* 1. The function is JAVA based,
* 2. Has exactly one binary resource associated (We don't support loading
* dependencies yet) and
* 3. The binary is of type JAR.
*
* Returns true if compatible and false otherwise. In case of incompatible
* functions 'incompatMsg' has the reason for the incompatibility.
* */
private static boolean isFunctionCompatible(
org.apache.hadoop.hive.metastore.api.Function fn, StringBuilder incompatMsg) {
boolean isCompatible = true;
if (fn.getFunctionType() != FunctionType.JAVA) {
isCompatible = false;
incompatMsg.append("Function type: " + fn.getFunctionType().name()
+ " is not supported. Only " + FunctionType.JAVA.name() + " functions "
+ "are supported.");
} else if (fn.getResourceUrisSize() == 0) {
isCompatible = false;
incompatMsg.append("No executable binary resource (like a JAR file) is " +
"associated with this function. To fix this, recreate the function by " +
"specifying a 'location' in the function create statement.");
} else if (fn.getResourceUrisSize() != 1) {
isCompatible = false;
List<String> resourceUris = Lists.newArrayList();
for (ResourceUri resource: fn.getResourceUris()) {
resourceUris.add(resource.getUri());
}
incompatMsg.append("Impala does not support multiple Jars for dependencies."
+ "(" + Joiner.on(",").join(resourceUris) + ") ");
} else if (fn.getResourceUris().get(0).getResourceType() != ResourceType.JAR) {
isCompatible = false;
incompatMsg.append("Function binary type: " +
fn.getResourceUris().get(0).getResourceType().name()
+ " is not supported. Only " + ResourceType.JAR.name()
+ " type is supported.");
}
return isCompatible;
}
public static Function resolveFunction(Iterable<Function> fns, Function desc,
CompareMode mode) {
Preconditions.checkNotNull(fns);
Preconditions.checkNotNull(desc);
Preconditions.checkNotNull(mode);
// First check for identical
Function f = getBestFitFunction(fns, desc, Function.CompareMode.IS_IDENTICAL);
if (f != null || mode == Function.CompareMode.IS_IDENTICAL) return f;
// Next check for indistinguishable
f = getBestFitFunction(fns, desc, Function.CompareMode.IS_INDISTINGUISHABLE);
if (f != null || mode == Function.CompareMode.IS_INDISTINGUISHABLE) return f;
// Next check for strict supertypes
f = getBestFitFunction(fns, desc, Function.CompareMode.IS_SUPERTYPE_OF);
if (f != null || mode == Function.CompareMode.IS_SUPERTYPE_OF) return f;
// Finally check for non-strict supertypes
return getBestFitFunction(fns, desc, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
}
private static Function getBestFitFunction(Iterable<Function> fns, Function desc,
CompareMode mode) {
int maxScore = -1;
Function maxFunc = null;
for (Function f: fns) {
int score = f.calcMatchScore(desc, mode);
if (score >= 0 && score > maxScore) {
maxScore = score;
maxFunc = f;
}
}
return maxFunc;
}
public static List<Function> getVisibleFunctionsInCategory(
Iterable<Function> candidates, TFunctionCategory category) {
List<Function> result = Lists.newArrayList();
for (Function fn: candidates) {
if (fn.userVisible() && Function.categoryMatch(fn, category)) {
result.add(fn);
}
}
return result;
}
public static List<Function> getVisibleFunctions(Iterable<Function> candidates) {
List<Function> result = Lists.newArrayList();
for (Function fn: candidates) {
if (fn.userVisible()) result.add(fn);
}
return result;
}
/**
* Comparator that sorts function overloads. We want overloads to be always considered
* in a canonical order so that overload resolution in the case of multiple valid
* overloads does not depend on the order in which functions are added to the Db. The
* order is based on the PrimitiveType enum because this was the order used implicitly
* for builtin operators and functions in earlier versions of Impala.
*/
public static class FunctionResolutionOrder implements Comparator<Function> {
@Override
public int compare(Function f1, Function f2) {
int numSharedArgs = Math.min(f1.getNumArgs(), f2.getNumArgs());
for (int i = 0; i < numSharedArgs; ++i) {
int cmp = typeCompare(f1.getArgs()[i], f2.getArgs()[i]);
if (cmp < 0) {
return -1;
} else if (cmp > 0) {
return 1;
}
}
// Put alternative with fewer args first.
if (f1.getNumArgs() < f2.getNumArgs()) {
return -1;
} else if (f1.getNumArgs() > f2.getNumArgs()) {
return 1;
}
return 0;
}
private int typeCompare(Type t1, Type t2) {
Preconditions.checkState(!t1.isComplexType());
Preconditions.checkState(!t2.isComplexType());
return Integer.compare(t1.getPrimitiveType().ordinal(),
t2.getPrimitiveType().ordinal());
}
}
}