| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.scripting.groovy; |
| |
| import groovy.util.ResourceException; |
| import groovy.util.ScriptException; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.lang.annotation.Annotation; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Modifier; |
| import java.net.URL; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.pig.EvalFunc; |
| import org.apache.pig.builtin.OutputSchema; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.logicalLayer.schema.Schema; |
| import org.apache.pig.impl.util.Utils; |
| import org.apache.pig.parser.ParserException; |
| import org.apache.pig.scripting.ScriptEngine; |
| |
| public class GroovyEvalFunc<T> extends EvalFunc<T> { |
| |
| private Schema schema = null; |
| |
| private GroovyEvalFunc schemaFunction = null; |
| |
| protected Method method = null; |
| |
| private static Map<String, Class> scriptClasses = new ConcurrentHashMap<String, Class>(); |
| |
| private Object invocationTarget; |
| |
| public GroovyEvalFunc() { |
| } |
| |
| public GroovyEvalFunc(String path, String namespace, String methodName) throws IOException { |
| this(path, namespace, methodName, null); |
| } |
| |
| public GroovyEvalFunc(String path, String namespace, String methodName, Object target) throws IOException { |
| String fqmn = "".equals(namespace) ? methodName : namespace + ScriptEngine.NAMESPACE_SEPARATOR + methodName; |
| |
| Class c = scriptClasses.get(path); |
| |
| if (null == c) { |
| File file = new File(path); |
| URL resource = null; |
| if (!file.exists()) { |
| resource = ScriptEngine.class.getResource(path); |
| if (resource == null) { |
| resource = ScriptEngine.class.getResource(File.separator + path); |
| } |
| if (resource == null) { |
| //Try loading the script from other locally available jars (needed for Spark mode) |
| resource = Thread.currentThread().getContextClassLoader().getResource(path); |
| } |
| if (resource == null) { |
| throw new IOException("Cannot find " + path); |
| } |
| } else { |
| resource = file.toURL(); |
| } |
| try { |
| c = GroovyScriptEngine.getEngine().loadScriptByName(resource.toString()); |
| } catch (ScriptException se) { |
| throw new IOException(se); |
| } catch (ResourceException re) { |
| throw new IOException(re); |
| } |
| } |
| |
| scriptClasses.put(path, c); |
| |
| Method[] methods = c.getMethods(); |
| |
| int matches = 0; |
| |
| for (Method m : methods) { |
| if (m.getName().equals(methodName)) { |
| this.method = m; |
| matches++; |
| } |
| } |
| |
| if (null == this.method) { |
| throw new IOException("Method " + methodName + " was not found in '" + path + "'"); |
| } |
| |
| if (matches > 1) { |
| throw new IOException("There are " + matches + " methods with name '" + methodName + "', please make sure method names are unique within the Groovy class."); |
| } |
| |
| // |
| // Extract schema |
| // |
| |
| Annotation[] annotations = this.method.getAnnotations(); |
| |
| for (Annotation annotation : annotations) { |
| if (annotation.annotationType().equals(OutputSchemaFunction.class)) { |
| this.schemaFunction = new GroovyEvalFuncObject(path, namespace, ((OutputSchemaFunction) annotation).value()); |
| break; |
| } else if (annotation.annotationType().equals(OutputSchema.class)) { |
| this.schema = Utils.getSchemaFromString(((OutputSchema) annotation).value()); |
| break; |
| } |
| } |
| |
| // |
| // For static method, invocation target is null, for non |
| // static method, create/set invocation target unless passed |
| // to the constructor |
| // |
| |
| if (!Modifier.isStatic(this.method.getModifiers())) { |
| if (null != target) { |
| this.invocationTarget = target; |
| } else { |
| try { |
| this.invocationTarget = c.newInstance(); |
| } catch (InstantiationException ie) { |
| throw new IOException(ie); |
| } catch (IllegalAccessException iae) { |
| throw new IOException(iae); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public T exec(Tuple input) throws IOException { |
| |
| Object[] args = new Object[null != input ? input.size() : 0]; |
| |
| for (int i = 0; i < args.length; i++) { |
| args[i] = GroovyUtils.pigToGroovy(input.get(i)); |
| } |
| |
| try { |
| if (this.method.getReturnType().equals(Void.TYPE)) { |
| // |
| // Invoke method but return null if method is 'void', |
| // this is done so we can wrap 'accumulate' and 'cleanup' methods too. |
| // |
| this.method.invoke(this.invocationTarget, args); |
| return null; |
| } else { |
| return (T) GroovyUtils.groovyToPig(this.method.invoke(this.invocationTarget, args)); |
| } |
| } catch (InvocationTargetException ite) { |
| throw new IOException(ite); |
| } catch (IllegalAccessException iae) { |
| throw new IOException(iae); |
| } |
| } |
| |
| @Override |
| public Schema outputSchema(Schema input) { |
| if (null != this.schemaFunction) { |
| try { |
| Tuple t = TupleFactory.getInstance().newTuple(1); |
| // Strip enclosing '{}' from schema |
| t.set(0, input.toString().replaceAll("^\\{", "").replaceAll("\\}$", "")); |
| return Utils.getSchemaFromString((String) this.schemaFunction.exec(t)); |
| } catch (ParserException pe) { |
| throw new RuntimeException(pe); |
| } catch (IOException ioe) { |
| throw new RuntimeException(ioe); |
| } |
| } else { |
| return this.schema; |
| } |
| } |
| |
| public Object getInvocationTarget() { |
| return this.invocationTarget; |
| } |
| } |