blob: eaa2839f58852767d6675b2521c7cc229543ac21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.expr.fn;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import javax.inject.Inject;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.scanner.persistence.FieldDescriptor;
import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.DrillFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Converts FunctionCalls to Java Expressions.
*/
public class FunctionConverter {
private static final Logger logger = LoggerFactory.getLogger(FunctionConverter.class);
public DrillFuncHolder getHolder(AnnotatedClassDescriptor func, ClassLoader classLoader) {
FunctionTemplate template = func.getAnnotationProxy(FunctionTemplate.class);
if (template == null) {
return failure("Class does not declare FunctionTemplate annotation.", func);
}
String name = template.name();
List<String> names = Arrays.asList(template.names());
if (name.isEmpty() && names.isEmpty()) {
// none set
return failure("Must define 'name' or 'names'", func);
}
if (!name.isEmpty() && !names.isEmpty()) {
// both are set
return failure("Must use only one annotations 'name' or 'names', not both", func);
}
// start by getting field information.
List<ValueReference> params = Lists.newArrayList();
List<WorkspaceReference> workspaceFields = Lists.newArrayList();
ValueReference outputField = null;
int varArgsCount = 0;
for (FieldDescriptor field : func.getFields()) {
Param param = field.getAnnotationProxy(Param.class);
Output output = field.getAnnotationProxy(Output.class);
Workspace workspace = field.getAnnotationProxy(Workspace.class);
Inject inject = field.getAnnotationProxy(Inject.class);
Annotation[] annotations = {param, output, workspace, inject};
int annotationCount = 0;
for (Annotation annotationDescriptor : annotations) {
if (annotationDescriptor != null) {
annotationCount += 1;
}
}
if (annotationCount == 0) {
return failure("The field must be either a @Param, @Output, @Inject or @Workspace field.", func, field);
} else if(annotationCount > 1) {
return failure("The field must be only one of @Param, @Output, @Inject or @Workspace. It currently has more than one of these annotations.", func, field);
}
// TODO(Julien): verify there are a few of those and we can load them
Class<?> fieldClass = field.getFieldClass();
if (param != null || output != null) {
if (Object[].class.isAssignableFrom(fieldClass)) {
fieldClass = fieldClass.getComponentType();
varArgsCount++;
} else if (varArgsCount > 0 && param != null) {
return failure("Vararg should be the last argument in the function.", func, field);
}
if (varArgsCount > 1) {
return failure("Function should contain single vararg argument", func, field);
}
// Special processing for @Param FieldReader
if (param != null && FieldReader.class.isAssignableFrom(fieldClass)) {
ValueReference fieldReaderRef = ValueReference.createFieldReaderRef(field.getName());
fieldReaderRef.setVarArg(varArgsCount > 0);
params.add(fieldReaderRef);
continue;
}
// Special processing for @Output ComplexWriter
if (output != null && ComplexWriter.class.isAssignableFrom(fieldClass)) {
if (outputField != null) {
return failure("You've declared more than one @Output field.\n" +
"You must declare one and only @Output field per Function class.", func, field);
} else {
outputField = ValueReference.createComplexWriterRef(field.getName());
}
continue;
}
// check that param and output are value holders.
if (!ValueHolder.class.isAssignableFrom(fieldClass)) {
return failure(String.format("The field doesn't holds value of type %s which does not implement the ValueHolder or ComplexWriter interfaces.\n" +
"All fields of type @Param or @Output must extend this interface.", fieldClass), func, field);
}
// get the type field from the value holder.
MajorType type;
try {
type = getStaticFieldValue("TYPE", fieldClass, MajorType.class);
} catch (Exception e) {
return failure("Failure while trying to access the ValueHolder's TYPE static variable. All ValueHolders must contain a static TYPE variable that defines their MajorType.", e, func, field);
}
ValueReference p = new ValueReference(type, field.getName());
if (param != null) {
p.setConstant(param.constant());
p.setVarArg(varArgsCount > 0);
params.add(p);
} else {
if (outputField != null) {
return failure("You've declared more than one @Output field. You must declare one and only @Output field per Function class.", func, field);
} else {
outputField = p;
}
}
} else {
// workspace work.
boolean isInject = inject != null;
if (isInject && UdfUtilities.INJECTABLE_GETTER_METHODS.get(fieldClass) == null) {
return failure(
String.format(
"A %s cannot be injected into a %s,"
+ " available injectable classes are: %s.",
fieldClass, DrillFunc.class.getSimpleName(),
Joiner.on(",").join(UdfUtilities.INJECTABLE_GETTER_METHODS.keySet())
), func, field);
}
WorkspaceReference wsReference = new WorkspaceReference(fieldClass, field.getName(), isInject);
if (!isInject && template.scope() == FunctionScope.POINT_AGGREGATE && !ValueHolder.class.isAssignableFrom(fieldClass) ) {
return failure(String.format("Aggregate function '%s' workspace variable '%s' is of type '%s'. Please change it to Holder type.", func.getClassName(), field.getName(), fieldClass), func, field);
}
//If the workspace var is of Holder type, get its MajorType and assign to WorkspaceReference.
if (ValueHolder.class.isAssignableFrom(fieldClass)) {
MajorType majorType;
try {
majorType = getStaticFieldValue("TYPE", fieldClass, MajorType.class);
} catch (Exception e) {
return failure("Failure while trying to access the ValueHolder's TYPE static variable. All ValueHolders must contain a static TYPE variable that defines their MajorType.", e, func, field);
}
wsReference.setMajorType(majorType);
}
workspaceFields.add(wsReference);
}
}
if (outputField == null) {
return failure("This function declares zero output fields. A function must declare one output field.", func);
}
FunctionInitializer initializer = new FunctionInitializer(func.getClassName(), classLoader);
try {
// return holder
ValueReference[] ps = params.toArray(new ValueReference[0]);
WorkspaceReference[] works = workspaceFields.toArray(new WorkspaceReference[0]);
FunctionAttributes functionAttributes = new FunctionAttributes(template, ps, outputField, works);
switch (template.scope()) {
case POINT_AGGREGATE:
return outputField.isComplexWriter() ?
new DrillComplexWriterAggFuncHolder(functionAttributes, initializer) :
new DrillAggFuncHolder(functionAttributes, initializer);
case SIMPLE:
return outputField.isComplexWriter() ?
new DrillComplexWriterFuncHolder(functionAttributes, initializer) :
new DrillSimpleFuncHolder(functionAttributes, initializer);
case HOLISTIC_AGGREGATE:
case RANGE_AGGREGATE:
default:
return failure("Unsupported Function Type.", func);
}
} catch (Exception | NoSuchFieldError | AbstractMethodError ex) {
return failure("Failure while creating function holder.", ex, func);
}
}
@SuppressWarnings("unchecked")
private <T> T getStaticFieldValue(String fieldName, Class<?> valueType, Class<T> c) throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException{
Field f = valueType.getDeclaredField(fieldName);
Object val = f.get(null);
return (T) val;
}
private static DrillFuncHolder failure(String message, Throwable t, AnnotatedClassDescriptor func, FieldDescriptor field) {
return fieldFailure(message, t, func.getClassName(), field.getName());
}
private static DrillFuncHolder failure(String message, AnnotatedClassDescriptor func, FieldDescriptor field) {
return fieldFailure(message, null, func.getClassName(), field.getName());
}
private DrillFuncHolder failure(String message, AnnotatedClassDescriptor func) {
return classFailure(message, null, func.getClassName());
}
private DrillFuncHolder failure(String message, Throwable t, AnnotatedClassDescriptor func) {
return classFailure(message, t, func.getClassName());
}
private static DrillFuncHolder classFailure(String message, Throwable t, String funcClassName) {
return failure(String.format("Failure loading function class [%s]. Message: %s", funcClassName, message), t);
}
private static DrillFuncHolder fieldFailure(String message, Throwable t, String funcClassName, String fieldName) {
return failure(String.format("Failure loading function class %s, field %s. Message: %s", funcClassName, fieldName, message), t);
}
private static DrillFuncHolder failure(String message, Throwable t) {
if (t == null) {
t = new DrillRuntimeException(message);
}
logger.warn(message, t);
return null;
}
}