blob: c83ca22a305cd566b7315175684d1edae071caf7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.scripting.groovy;
import groovy.util.ResourceException;
import groovy.util.ScriptException;
import java.io.File;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pig.EvalFunc;
import org.apache.pig.builtin.OutputSchema;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.apache.pig.scripting.ScriptEngine;
public class GroovyEvalFunc<T> extends EvalFunc<T> {
private Schema schema = null;
private GroovyEvalFunc schemaFunction = null;
protected Method method = null;
private static Map<String, Class> scriptClasses = new ConcurrentHashMap<String, Class>();
private Object invocationTarget;
public GroovyEvalFunc() {
}
public GroovyEvalFunc(String path, String namespace, String methodName) throws IOException {
this(path, namespace, methodName, null);
}
public GroovyEvalFunc(String path, String namespace, String methodName, Object target) throws IOException {
String fqmn = "".equals(namespace) ? methodName : namespace + ScriptEngine.NAMESPACE_SEPARATOR + methodName;
Class c = scriptClasses.get(path);
if (null == c) {
File file = new File(path);
URL resource = null;
if (!file.exists()) {
resource = ScriptEngine.class.getResource(path);
if (resource == null) {
resource = ScriptEngine.class.getResource(File.separator + path);
}
if (resource == null) {
//Try loading the script from other locally available jars (needed for Spark mode)
resource = Thread.currentThread().getContextClassLoader().getResource(path);
}
if (resource == null) {
throw new IOException("Cannot find " + path);
}
} else {
resource = file.toURL();
}
try {
c = GroovyScriptEngine.getEngine().loadScriptByName(resource.toString());
} catch (ScriptException se) {
throw new IOException(se);
} catch (ResourceException re) {
throw new IOException(re);
}
}
scriptClasses.put(path, c);
Method[] methods = c.getMethods();
int matches = 0;
for (Method m : methods) {
if (m.getName().equals(methodName)) {
this.method = m;
matches++;
}
}
if (null == this.method) {
throw new IOException("Method " + methodName + " was not found in '" + path + "'");
}
if (matches > 1) {
throw new IOException("There are " + matches + " methods with name '" + methodName + "', please make sure method names are unique within the Groovy class.");
}
//
// Extract schema
//
Annotation[] annotations = this.method.getAnnotations();
for (Annotation annotation : annotations) {
if (annotation.annotationType().equals(OutputSchemaFunction.class)) {
this.schemaFunction = new GroovyEvalFuncObject(path, namespace, ((OutputSchemaFunction) annotation).value());
break;
} else if (annotation.annotationType().equals(OutputSchema.class)) {
this.schema = Utils.getSchemaFromString(((OutputSchema) annotation).value());
break;
}
}
//
// For static method, invocation target is null, for non
// static method, create/set invocation target unless passed
// to the constructor
//
if (!Modifier.isStatic(this.method.getModifiers())) {
if (null != target) {
this.invocationTarget = target;
} else {
try {
this.invocationTarget = c.newInstance();
} catch (InstantiationException ie) {
throw new IOException(ie);
} catch (IllegalAccessException iae) {
throw new IOException(iae);
}
}
}
}
@Override
public T exec(Tuple input) throws IOException {
Object[] args = new Object[null != input ? input.size() : 0];
for (int i = 0; i < args.length; i++) {
args[i] = GroovyUtils.pigToGroovy(input.get(i));
}
try {
if (this.method.getReturnType().equals(Void.TYPE)) {
//
// Invoke method but return null if method is 'void',
// this is done so we can wrap 'accumulate' and 'cleanup' methods too.
//
this.method.invoke(this.invocationTarget, args);
return null;
} else {
return (T) GroovyUtils.groovyToPig(this.method.invoke(this.invocationTarget, args));
}
} catch (InvocationTargetException ite) {
throw new IOException(ite);
} catch (IllegalAccessException iae) {
throw new IOException(iae);
}
}
@Override
public Schema outputSchema(Schema input) {
if (null != this.schemaFunction) {
try {
Tuple t = TupleFactory.getInstance().newTuple(1);
// Strip enclosing '{}' from schema
t.set(0, input.toString().replaceAll("^\\{", "").replaceAll("\\}$", ""));
return Utils.getSchemaFromString((String) this.schemaFunction.exec(t));
} catch (ParserException pe) {
throw new RuntimeException(pe);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
return this.schema;
}
}
public Object getInvocationTarget() {
return this.invocationTarget;
}
}