blob: f7a35f9bf203d0e4709c9019270bb32a02c63fc1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.scripting.jython;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.python.core.Py;
import org.python.core.PyBaseCode;
import org.python.core.PyException;
import org.python.core.PyFunction;
import org.python.core.PyObject;
import org.python.core.PyTableCode;
/**
* Python implementation of a Pig UDF Performs mappings between Python & Pig
* data structures
*/
public class JythonFunction extends EvalFunc<Object> {
private static boolean logOnce=true;
private PyFunction function;
private Schema schema;
private int num_parameters;
private String scriptFilePath;
private String outputSchemaFunc;
public JythonFunction(String filename, String functionName) throws IOException{
PyFunction f;
boolean found = false;
try {
f = JythonScriptEngine.getFunction(filename, functionName);
this.function = f;
num_parameters = ((PyBaseCode) f.__code__).co_argcount;
PyObject outputSchemaDef = f.__findattr__("outputSchema".intern());
if (outputSchemaDef != null) {
this.schema = Utils.getSchemaFromString(outputSchemaDef.toString());
logOnce("Schema '"+outputSchemaDef.toString()+"' defined for func "+functionName);
found = true;
}
PyObject outputSchemaFunctionDef = f.__findattr__("outputSchemaFunction".intern());
if (outputSchemaFunctionDef != null) {
if(found) {
throw new ExecException(
"multiple decorators for " + functionName);
}
scriptFilePath = filename;
outputSchemaFunc = outputSchemaFunctionDef.toString();
logOnce("Schema Function '"+outputSchemaFunc+"' defined for func "+functionName);
this.schema = null;
found = true;
}
PyObject schemaFunctionDef = f.__findattr__("schemaFunction".intern());
if (schemaFunctionDef != null) {
if(found) {
throw new ExecException(
"multiple decorators for " + functionName);
}
// We should not see these functions here
// BUG
throw new ExecException(
"unregistered " + functionName);
}else if (!found && outputSchemaFunctionDef == null){
logOnce("No schema defined for function '"+functionName+ "' in "+filename);
}
} catch (ParserException pe) {
throw new ExecException("Could not parse schema for script function " + pe, pe);
} catch (IOException e) {
throw new IllegalStateException("Could not initialize: " + filename, e);
} catch (Exception e) {
throw new ExecException("Could not initialize: " + filename, e);
}
}
private void logOnce (String msg)
{
if(logOnce)
getLogger().info(msg);
logOnce = false;
}
@Override
public Object exec(Tuple tuple) throws IOException {
try {
if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.__code__).varargs)) {
// ignore input tuple
PyObject out = function.__call__();
return JythonUtils.pythonToPig(out);
}
else {
// this way we get the elements of the tuple as parameters instead
// of one tuple object
PyObject[] params = JythonUtils.pigTupleToPyTuple(tuple).getArray();
return JythonUtils.pythonToPig(function.__call__(params));
}
} catch (PyException e) {
throw new ExecException("Error executing function", e);
} catch (Exception e) {
throw new IOException("Error executing function", e);
}
}
@Override
public Schema outputSchema(Schema input) {
if(schema != null) {
return schema;
} else {
if(outputSchemaFunc != null) {
PyFunction pf;
try {
pf = JythonScriptEngine.getFunction(scriptFilePath, outputSchemaFunc);
// this should be a schema function
PyObject schemaFunctionDef = pf.__findattr__("schemaFunction".intern());
if(schemaFunctionDef == null) {
throw new IllegalStateException("Function: "
+ outputSchemaFunc + " is not a schema function");
}
Object newSchema = ((pf.__call__(Py.java2py(input))).__tojava__(Object.class));
if (newSchema instanceof ResourceSchema) {
return(Schema.getPigSchema((ResourceSchema) newSchema));
}
else if (newSchema instanceof Schema) {
return (Schema) newSchema;
}
else {
return Utils.getSchemaFromString(newSchema.toString());
}
} catch (IOException ioe) {
throw new IllegalStateException("Could not find function: "
+ outputSchemaFunc + "()", ioe);
}
} else {
return new Schema(new Schema.FieldSchema(null, DataType.BYTEARRAY));
}
}
}
}