| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.scripting.js; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.pig.EvalFunc; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.data.BagFactory; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.logicalLayer.schema.Schema; |
| import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; |
| import org.apache.pig.impl.util.Utils; |
| import org.apache.pig.parser.ParserException; |
| import org.mozilla.javascript.Context; |
| import org.mozilla.javascript.NativeJavaObject; |
| import org.mozilla.javascript.NativeObject; |
| import org.mozilla.javascript.Scriptable; |
| import org.mozilla.javascript.Undefined; |
| |
| public class JsFunction extends EvalFunc<Object> { |
| private static final Log LOG = LogFactory.getLog(JsFunction.class); |
| |
| /** |
| * helper function for debugging |
| * @param schema a PIG schema |
| * @return a readable string representation of the schema |
| */ |
| private static String stringify(Schema schema) { |
| StringBuffer buffer = new StringBuffer(); |
| stringify(schema, buffer); |
| return buffer.toString(); |
| } |
| |
| /** |
| * |
| * helper function for debugging |
| * @param schema a PIG schema |
| * @param buffer where to write the readable representation of the schema |
| */ |
| private static void stringify(Schema schema, StringBuffer buffer) { |
| if (schema != null) { |
| buffer.append("( "); |
| List<FieldSchema> fields = schema.getFields(); |
| for (int i = 0; i < fields.size(); i++) { |
| FieldSchema array_element = fields.get(i); |
| if (i!=0) { |
| buffer.append(", "); |
| } |
| buffer |
| .append(DataType.findTypeName(array_element.type)) |
| .append(": ") |
| .append(array_element.alias) |
| .append(" "); |
| stringify(array_element.schema, buffer); |
| } |
| buffer.append(" )"); |
| } |
| } |
| |
| private String functionName; |
| private JsScriptEngine jsScriptEngine; |
| private Schema outputSchema; |
| |
| /////////////////////// |
| // Debugging functions |
| /////////////////////// |
| |
| private void debugConvertPigToJS(int depth, String pigType, Object value, Schema schema) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + toString(value) + " using " + stringify(schema)); |
| } |
| } |
| |
| private void debugConvertJSToPig(int depth, String pigType, Object value, Schema schema) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(indent(depth)+"converting to Pig " + pigType + " " + toString(value) + " using " + stringify(schema)); |
| } |
| } |
| |
| private void debugReturn(int depth, Object value) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(indent(depth) + "returning " + toString(value)); |
| } |
| } |
| |
| /** |
| * generates a String of white spaces for debugging indentation |
| * @param depth the call depth |
| * @return a string of spaces of length 2*depth |
| */ |
| private String indent(int depth) { |
| StringBuffer b = new StringBuffer(depth*2); |
| for (int i = 0; i < depth * 2; i++) { |
| b.append(' '); |
| } |
| return b.toString(); |
| } |
| |
| /** |
| * debug utility to display a JS object |
| * @param buffer whil append the representation here |
| * @param object special treatment if a javascript object |
| */ |
| private void append(StringBuffer buffer, Object object) { |
| if (object instanceof Scriptable) { |
| Scriptable scriptable = (Scriptable) object; |
| buffer.append("{"); |
| boolean first = true; |
| for (Object id : scriptable.getIds()) { |
| if (first) { |
| first = false; |
| } else { |
| buffer.append(", "); |
| } |
| if (id instanceof String) { |
| buffer.append(id).append(": "); |
| append(buffer, scriptable.get((String)id, jsScriptEngine.getScope())); |
| } else { |
| buffer.append("[").append(id).append("]: "); |
| append(buffer, scriptable.get((Integer)id, jsScriptEngine.getScope())); |
| } |
| } |
| buffer.append("}"); |
| |
| } else { |
| buffer.append(object); |
| } |
| } |
| |
| /** |
| * debug utility to display a JS object |
| * @param object an object |
| * @return a string representation of the object |
| */ |
| private String toString(Object object) { |
| StringBuffer buffer = new StringBuffer(); |
| append(buffer, object); |
| return buffer.toString(); |
| |
| } |
| |
| ////////////////////// |
| |
| public JsFunction(String functionName) { |
| this.jsScriptEngine = JsScriptEngine.getInstance(); |
| this.functionName = functionName; |
| Object outputSchemaObj = jsScriptEngine.jsEval(this.getClass().getName() + "(String)", |
| functionName + ".outputSchema"); |
| //if no schema defined, fall back to bytearray |
| if (outputSchemaObj == null || outputSchemaObj instanceof Undefined) { |
| this.outputSchema = new Schema(new Schema.FieldSchema(null, DataType.BYTEARRAY)); |
| } |
| else { |
| try { |
| this.outputSchema = Utils.getSchemaFromString(outputSchemaObj.toString()); |
| } |
| catch (ParserException e) { |
| throw new IllegalArgumentException(functionName |
| + ".outputSchema is not a valid schema: " + e.getMessage(), e); |
| } |
| } |
| |
| } |
| |
| /////////////////////////// |
| // EvalFunc implementation |
| /////////////////////////// |
| |
| @Override |
| public Object exec(Tuple tuple) throws IOException { |
| Schema inputSchema = this.getInputSchema(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( "CALL " + stringify(outputSchema) + " " + functionName + " " + stringify(inputSchema)); |
| } |
| // UDF always take a tuple: unwrapping when not necessary to simplify UDFs |
| if (inputSchema.size() == 1 && inputSchema.getField(0).type == DataType.TUPLE) { |
| inputSchema = inputSchema.getField(0).schema; |
| } |
| |
| Scriptable params = pigTupleToJS(tuple, inputSchema, 0); |
| |
| Object[] passedParams = new Object[inputSchema.size()]; |
| for (int j = 0; j < passedParams.length; j++) { |
| passedParams[j] = params.get(inputSchema.getField(j).alias, params); |
| } |
| |
| Object result = jsScriptEngine.jsCall(functionName, passedParams); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( "call "+functionName+"("+Arrays.toString(passedParams)+") => "+toString(result)); |
| } |
| |
| // We wrap the result with an object in the following cases: |
| // 1. Result is not an object type. |
| // 2. OutputSchema is a tuple type. |
| if (!(result instanceof NativeObject) || outputSchema.getField(0).type == DataType.TUPLE) { |
| Scriptable wrapper = jsScriptEngine.jsNewObject(); |
| wrapper.put(outputSchema.getField(0).alias, wrapper, result); |
| result = wrapper; |
| } |
| Tuple evalTuple = jsToPigTuple((Scriptable)result, outputSchema, 0); |
| Object eval = outputSchema.size() == 1 ? evalTuple.get(0) : evalTuple; |
| LOG.debug(eval); |
| return eval; |
| } |
| |
| @Override |
| public Schema outputSchema(Schema input) { |
| this.setInputSchema(input); |
| return outputSchema; |
| } |
| |
| ///////////////////////// |
| // Conversion Pig to JS |
| ///////////////////////// |
| |
| /** |
| * converts a tuple to javascript object based on a schema |
| * @param tuple the tuple to convert |
| * @param schema the schema to use for conversion |
| * @param depth call depth used for debugging messages |
| * @return the resulting javascript object |
| */ |
| @SuppressWarnings("unchecked") |
| private Scriptable pigTupleToJS(Tuple tuple, Schema schema, int depth) throws FrontendException, ExecException { |
| debugConvertPigToJS(depth, "Tuple", tuple, schema); |
| Scriptable object = null; |
| if (tuple != null) { |
| object = jsScriptEngine.jsNewObject(); |
| |
| for (int i = 0; i < schema.size(); i++) { |
| FieldSchema field = schema.getField(i); |
| Object value; |
| if (field.type == DataType.BAG) { |
| value = pigBagToJS((DataBag)tuple.get(i), field.schema, depth + 1); |
| } else if (field.type == DataType.TUPLE) { |
| value = pigTupleToJS((Tuple)tuple.get(i), field.schema, depth + 1); |
| } else if (field.type == DataType.MAP) { |
| value = pigMapToJS((Map<String, Object>)tuple.get(i), field.schema, depth + 1); |
| } else { |
| debugConvertPigToJS(depth+1, "value", tuple.get(i), field.schema); |
| value = Context.javaToJS(tuple.get(i), jsScriptEngine.getScope()); |
| debugReturn(depth + 1, value); |
| } |
| object.put(field.alias, object, value); |
| } |
| } |
| debugReturn(depth, object); |
| return object; |
| } |
| |
| /** |
| * converts a map to javascript object based on a schema |
| * @param map the map to convert |
| * @param schema the schema to use for conversion |
| * @param depth call depth used for debugging messages |
| * @return the resulting javascript object |
| */ |
| private Scriptable pigMapToJS(Map<String, Object> map, Schema schema, int depth) { |
| debugConvertPigToJS(depth, "Map", map, schema); |
| Scriptable object = jsScriptEngine.jsNewObject(); |
| |
| for (Entry<String, Object> entry : map.entrySet()) { |
| object.put(entry.getKey(), object, entry.getValue()); |
| } |
| debugReturn(depth, object); |
| return object; |
| } |
| |
| /** |
| * converts a bag to javascript object based on a schema |
| * @param bag the bag to convert |
| * @param schema the schema to use for conversion |
| * @param depth call depth used for debugging messages |
| * @return the resulting javascript object |
| * @throws FrontendException |
| * @throws ExecException |
| */ |
| private Scriptable pigBagToJS(DataBag bag, Schema schema, int depth) throws FrontendException, ExecException { |
| debugConvertPigToJS(depth, "Bag", bag, schema); |
| if (schema.size() == 1 && schema.getField(0).type == DataType.TUPLE) { |
| // unwrapping as bags always contain a tuple |
| schema = schema.getField(0).schema; |
| } |
| Scriptable array = jsScriptEngine.jsNewArray(bag.size()); |
| array.setParentScope(jsScriptEngine.getScope()); |
| int i= 0; |
| for (Tuple t : bag) { |
| array.put(i++, array, pigTupleToJS(t, schema, depth + 1)); |
| } |
| debugReturn(depth, array); |
| return array; |
| } |
| |
| ///////////////////////// |
| // Conversion JS to Pig |
| ///////////////////////// |
| |
| private Tuple jsToPigTuple(Scriptable object, Schema schema, int depth) throws FrontendException, ExecException { |
| debugConvertJSToPig(depth, "Tuple", object, schema); |
| Tuple t = TupleFactory.getInstance().newTuple(schema.size()); |
| for (int i = 0; i < schema.size(); i++) { |
| FieldSchema field = schema.getField(i); |
| if (object.has(field.alias, jsScriptEngine.getScope())) { |
| Object attr = object.get(field.alias, object); |
| Object value; |
| if (field.type == DataType.BAG) { |
| value = jsToPigBag((Scriptable)attr, field.schema, depth + 1); |
| } else if (field.type == DataType.TUPLE) { |
| value = jsToPigTuple((Scriptable)attr, field.schema, depth + 1); |
| } else if (field.type == DataType.MAP) { |
| value = jsToPigMap((Scriptable)attr, field.schema, depth + 1); |
| } else if (attr instanceof NativeJavaObject) { |
| value = ((NativeJavaObject)attr).unwrap(); |
| } else if (attr instanceof Undefined) { |
| value = null; |
| } else { |
| value = attr; |
| } |
| t.set(i, value); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("X( "+field.alias+" NOT FOUND"); |
| } |
| } |
| } |
| debugReturn(depth, t); |
| return t; |
| } |
| |
| private Object jsToPigMap(Scriptable object, Schema schema, int depth) { |
| debugConvertJSToPig(depth, "Map", object, schema); |
| Map<String, Object> map = new HashMap<String, Object>(); |
| Object[] ids = object.getIds(); |
| for (Object id : ids) { |
| if (id instanceof String) { |
| String name = (String) id; |
| Object value = object.get(name, object); |
| if (value instanceof NativeJavaObject) { |
| value = ((NativeJavaObject)value).unwrap(); |
| } else if (value instanceof Undefined) { |
| value = null; |
| } |
| map.put(name, value); |
| } |
| } |
| debugReturn(depth, map); |
| return map; |
| } |
| |
| private DataBag jsToPigBag(Scriptable array, Schema schema, int depth) throws FrontendException, ExecException { |
| debugConvertJSToPig(depth, "Bag", array, schema); |
| if (schema.size() == 1 && schema.getField(0).type == DataType.TUPLE) { |
| schema = schema.getField(0).schema; |
| } |
| List<Tuple> bag = new ArrayList<Tuple>(); |
| for (Object id : array.getIds()) { |
| Scriptable arrayValue = (Scriptable)array.get(((Integer)id).intValue(), null); |
| bag.add(jsToPigTuple(arrayValue, schema, depth + 1)); |
| } |
| DataBag result = BagFactory.getInstance().newDefaultBag(bag); |
| debugReturn(depth, result); |
| return result; |
| } |
| |
| } |