blob: 688b577c744f49be455040d5e98168710d88185d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.scripting.jython;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.pigstats.PigStats;
import org.python.core.ClasspathPyImporter;
import org.python.core.Py;
import org.python.core.PyException;
import org.python.core.PyFrame;
import org.python.core.PyFunction;
import org.python.core.PyInteger;
import org.python.core.PyJavaPackage;
import org.python.core.PyObject;
import org.python.core.PyString;
import org.python.core.PyStringMap;
import org.python.core.PySystemState;
import org.python.core.PyTuple;
import org.python.modules.zipimport.zipimporter;
import org.python.util.PythonInterpreter;
/**
* Implementation of the script engine for Jython
*/
public class JythonScriptEngine extends ScriptEngine {
private static final Log LOG = LogFactory.getLog(JythonScriptEngine.class);
/**
* Language Interpreter Uses static holder pattern
*/
private static class Interpreter {
static final PythonInterpreter interpreter;
static final ArrayList<String> filesLoaded = new ArrayList<String>();
static final String JVM_JAR;
static {
// should look like: file:JVM_JAR!/java/lang/Object.class
String rpath = Object.class.getResource("Object.class").getPath();
JVM_JAR = rpath.replaceAll("^file:(.*)!/java/lang/Object.class$", "$1");
// Determine if a usable python.cachedir has been provided
// if not, certain uses of jython's import will not work e.g., so create a tmp dir
// - from some.package import *
// - import non.jvm.package
try {
String skip = System.getProperty(PySystemState.PYTHON_CACHEDIR_SKIP, "false");
if (skip.equalsIgnoreCase("true")) {
LOG.warn("jython cachedir skipped, jython may not work");
} else {
File tmp = null;
String cdir = System.getProperty(PySystemState.PYTHON_CACHEDIR);
if (cdir != null) {
tmp = new File(cdir);
if (tmp.canWrite() == false) {
LOG.error("CACHEDIR: not writable");
throw new RuntimeException("python.cachedir not writable: " + cdir);
}
}
if (tmp == null) {
tmp = File.createTempFile("pig_jython_", "");
tmp.delete();
if (tmp.mkdirs() == false) {
LOG.warn("unable to create a tmp dir for the cache, jython may not work");
} else {
LOG.info("created tmp python.cachedir=" + tmp);
System.setProperty(PySystemState.PYTHON_CACHEDIR, tmp.getAbsolutePath());
}
Runtime.getRuntime().addShutdownHook(new DirDeleter(tmp));
}
}
// local file system import path elements: current dir, JYTHON_HOME/Lib
Py.getSystemState().path.append(new PyString(System.getProperty("user.dir")));
String jyhome = System.getenv("JYTHON_HOME");
if (jyhome != null) {
Py.getSystemState().path.append(new PyString(jyhome + File.separator + "Lib"));
}
} catch (Exception e) {
LOG.warn("issue with jython cache dir", e);
}
// cacdedir now configured, allocate the python interpreter
interpreter = new PythonInterpreter();
}
/**
* ensure the decorator functions are defined in the interpreter, and
* manage the module import dependencies.
* @param path location of a file to exec in the interpreter
* @param pigContext if non-null, module import state is tracked
* @throws IOException
*/
static synchronized void init(String path, PigContext pigContext) throws IOException {
// Decorators -
// "schemaFunction"
// "outputSchema"
// "outputSchemaFunction"
if (!filesLoaded.contains(path)) {
// attempt addition of schema decorator handler, fail silently
interpreter.exec("def outputSchema(schema_def):\n"
+ " def decorator(func):\n"
+ " func.outputSchema = schema_def\n"
+ " return func\n"
+ " return decorator\n\n");
interpreter.exec("def outputSchemaFunction(schema_def):\n"
+ " def decorator(func):\n"
+ " func.outputSchemaFunction = schema_def\n"
+ " return func\n"
+ " return decorator\n");
interpreter.exec("def schemaFunction(schema_def):\n"
+ " def decorator(func):\n"
+ " func.schemaFunction = schema_def\n"
+ " return func\n"
+ " return decorator\n\n");
InputStream is = getScriptAsStream(path);
if (is == null) {
throw new IllegalStateException("unable to create a stream for path: " + path);
}
try {
execfile(is, path, pigContext);
} finally {
is.close();
}
}
}
/**
* does not call script.close()
* @param script
* @param path
* @param pigContext
* @throws ExecException
*/
static void execfile(InputStream script, String path, PigContext pigContext) throws ExecException {
try {
if( pigContext != null ) {
String [] argv;
try {
argv = (String[])ObjectSerializer.deserialize(
pigContext.getProperties().getProperty(PigContext.PIG_CMD_ARGS_REMAINDERS));
} catch (IOException e) {
throw new ExecException("Cannot deserialize command line arguments", e);
}
PySystemState state = Py.getSystemState();
state.argv.clear();
if( argv != null ) {
for (String str : argv ) {
state.argv.append(new PyString(str));
}
} else {
LOG.warn(PigContext.PIG_CMD_ARGS_REMAINDERS
+ " is empty. This is not expected unless on testing." );
}
}
// determine the current module state
Map<String, String> before = pigContext != null ? getModuleState() : null;
if (before != null) {
// os.py, stax.py and posixpath.py are part of the initial state
// if Lib directory is present and without including os.py, modules
// which import os fail
Set<String> includePyModules = new HashSet<String>();
for (String key : before.keySet()) {
// $py.class is created if Lib folder is writable
if (key.endsWith(".py") || key.endsWith("$py.class")) {
includePyModules.add(key);
}
}
before.keySet().removeAll(includePyModules);
}
// exec the code, arbitrary imports are processed
interpreter.execfile(script, path);
// determine the 'post import' module state
Map<String, String> after = pigContext != null ? getModuleState() : null;
// add the module files to the context
if (after != null && pigContext != null) {
after.keySet().removeAll(before.keySet());
for (Map.Entry<String, String> entry : after.entrySet()) {
String modulename = entry.getKey();
String modulepath = entry.getValue();
if (modulepath.equals(JVM_JAR)) {
continue;
} else if (modulepath.endsWith(".jar") || modulepath.endsWith(".zip")) {
pigContext.addScriptJar(modulepath);
} else {
pigContext.addScriptFile(modulename, modulepath);
}
}
}
} catch (PyException e) {
if (e.match(Py.SystemExit)) {
PyObject value = e.value;
if (PyException.isExceptionInstance(e.value)) {
value = value.__findattr__("code");
}
if (new PyInteger(0).equals(value)) {
LOG.info("Script invoked sys.exit(0)");
return;
}
}
String message = "Python Error. " + e;
throw new ExecException(message, 1121, e);
}
}
static String get(String name) {
return interpreter.get(name).toString();
}
static void setMain(boolean isMain) {
if (isMain) {
interpreter.set("__name__", "__main__");
} else {
interpreter.set("__name__", "__lib__");
}
}
/**
* get the state of modules currently loaded
* @return a map of module name to module file (absolute path)
*/
private static Map<String, String> getModuleState() {
// determine the current module state
Map<String, String> files = new HashMap<String, String>();
PyStringMap modules = (PyStringMap) Py.getSystemState().modules;
for (PyObject kvp : modules.iteritems().asIterable()) {
PyTuple tuple = (PyTuple) kvp;
String name = tuple.get(0).toString();
Object value = tuple.get(1);
// inspect the module to determine file location and status
try {
Object fileEntry = null;
Object loader = null;
if (value instanceof PyJavaPackage ) {
fileEntry = ((PyJavaPackage) value).__file__;
} else if (value instanceof PyObject) {
// resolved through the filesystem (or built-in)
PyObject dict = ((PyObject) value).getDict();
if (dict != null) {
fileEntry = dict.__finditem__("__file__");
loader = dict.__finditem__("__loader__");
} // else built-in
} // else some system module?
if (fileEntry != null) {
File file = resolvePyModulePath(fileEntry.toString(), loader);
if (file.exists()) {
String apath = file.getAbsolutePath();
if (apath.endsWith(".jar") || apath.endsWith(".zip")) {
// jar files are simple added to the pigContext
files.put(apath, apath);
} else {
// determine the relative path that the file should have in the jar
int pos = apath.lastIndexOf(File.separatorChar + name.replace('.', File.separatorChar));
if (pos > 0) {
files.put(apath.substring(pos + 1), apath);
} else {
files.put(apath, apath);
}
}
} else {
LOG.warn("module file does not exist: " + name + ", " + file);
}
} // else built-in
} catch (Exception e) {
LOG.warn("exception while retrieving module state: " + value, e);
}
}
return files;
}
}
private static File resolvePyModulePath(String path, Object loader) {
File file = new File(path);
if (!file.exists() && loader != null) {
if(path.startsWith(ClasspathPyImporter.PYCLASSPATH_PREFIX) && loader instanceof ClasspathPyImporter) {
path = path.replaceFirst(ClasspathPyImporter.PYCLASSPATH_PREFIX, "");
URL resource = ScriptEngine.class.getResource(path);
if (resource == null) {
resource = ScriptEngine.class.getResource(File.separator + path);
}
if (resource != null) {
return new File(resource.getFile());
}
} else if (loader instanceof zipimporter) {
zipimporter importer = (zipimporter) loader;
return new File(importer.archive);
} //JavaImporter??
}
return file;
}
@Override
public void registerFunctions(String path, String namespace, PigContext pigContext)
throws IOException {
Interpreter.setMain(false);
Interpreter.init(path, pigContext);
pigContext.addScriptJar(getJarPath(PythonInterpreter.class));
PythonInterpreter pi = Interpreter.interpreter;
@SuppressWarnings("unchecked")
List<PyTuple> locals = ((PyStringMap) pi.getLocals()).items();
namespace = (namespace == null) ? "" : namespace + NAMESPACE_SEPARATOR;
try {
for (PyTuple item : locals) {
String key = (String) item.get(0);
Object value = item.get(1);
FuncSpec funcspec = null;
if (!key.startsWith("__") && !key.equals("schemaFunction")
&& !key.equals("outputSchema")
&& !key.equals("outputSchemaFunction")
&& (value instanceof PyFunction)
&& (((PyFunction)value).__findattr__("schemaFunction")== null)) {
PyObject obj = ((PyFunction)value).__findattr__("outputSchema");
if(obj != null) {
Utils.getSchemaFromString(obj.toString());
}
funcspec = new FuncSpec(JythonFunction.class.getCanonicalName() + "('"
+ path + "','" + key +"')");
pigContext.registerFunction(namespace + key, funcspec);
LOG.info("Register scripting UDF: " + namespace + key);
}
}
} catch (ParserException pe) {
throw new IOException(
"Error parsing schema for script function from the decorator",
pe);
}
pigContext.addScriptFile(path);
Interpreter.setMain(true);
}
/**
* Gets the Python function object.
* @param path Path of the jython script file containing the function.
* @param functionName Name of the function
* @return a function object
* @throws IOException
*/
public static PyFunction getFunction(String path, String functionName) throws IOException {
Interpreter.setMain(false);
Interpreter.init(path, null);
return (PyFunction) Interpreter.interpreter.get(functionName);
}
@Override
protected Map<String, List<PigStats>> main(PigContext pigContext, String scriptFile)
throws IOException {
if (System.getProperty(PySystemState.PYTHON_CACHEDIR_SKIP)==null)
System.setProperty(PySystemState.PYTHON_CACHEDIR_SKIP, "false");
PigServer pigServer = new PigServer(pigContext, false);
// register dependencies
String jythonJar = getJarPath(PythonInterpreter.class);
if (jythonJar != null) {
pigServer.registerJar(jythonJar);
}
File f = new File(scriptFile);
if (!f.canRead()) {
throw new IOException("Can't read file: " + scriptFile);
}
FileInputStream fis1 = new FileInputStream(scriptFile);
try {
if (hasFunction(fis1)) {
registerFunctions(scriptFile, null, pigContext);
}
} finally {
fis1.close();
}
Interpreter.setMain(true);
FileInputStream fis = new FileInputStream(scriptFile);
try {
load(fis, scriptFile, pigServer.getPigContext());
} finally {
fis.close();
}
return getPigStatsMap();
}
/**
* execs the script text using the interpreter. populates the pig context
* with necessary module paths. does not close the inputstream.
* @param script
* @param scriptFile
* @param pigContext
* @throws IOException
*/
public void load(InputStream script, String scriptFile, PigContext pigContext) throws IOException {
Interpreter.execfile(script, scriptFile, pigContext);
}
@Override
protected String getScriptingLang() {
return "jython";
}
@Override
protected Map<String, Object> getParamsFromVariables() throws IOException {
PyFrame frame = Py.getFrame();
@SuppressWarnings("unchecked")
List<PyTuple> locals = ((PyStringMap) frame.getLocals()).items();
Map<String, Object> vars = new HashMap<String, Object>();
for (PyTuple item : locals) {
String key = (String) item.get(0);
Object obj = item.get(1);
if (obj != null) {
String value = item.get(1).toString();
vars.put(key, value);
}
}
return vars;
}
private static final Pattern p = Pattern.compile("^\\s*def\\s+(\\w+)\\s*.+");
private static final Pattern p1 = Pattern.compile("^\\s*if\\s+__name__\\s+==\\s+[\"']__main__[\"']\\s*:\\s*$");
private static boolean hasFunction(InputStream is) throws IOException {
boolean hasFunction = false;
boolean hasMain = false;
InputStreamReader in = new InputStreamReader(is);
BufferedReader br = new BufferedReader(in);
String line = br.readLine();
while (line != null) {
if (p.matcher(line).matches()) {
hasFunction = true;
} else if (p1.matcher(line).matches()) {
hasMain = true;
}
line = br.readLine();
}
if (hasFunction && !hasMain) {
String msg = "Embedded script cannot mix UDFs with top level code. "
+ "Please use if __name__ == '__main__': construct";
throw new IOException(msg);
}
return hasFunction;
}
/**
* File.deleteOnExit(File) does not work for a non-empty directory. This
* Thread is used to clean up the python.cachedir (if it was a tmp dir
* created by the Engine)
*/
private static class DirDeleter extends Thread {
private final File dir;
public DirDeleter(final File file) {
dir = file;
}
@Override
public void run() {
try {
delete(dir);
} catch (Exception e) {
LOG.warn("on cleanup", e);
}
}
private static boolean delete(final File file) {
if (file.isDirectory()) {
for (File f : file.listFiles()) {
delete(f);
}
}
return file.delete();
}
}
}