blob: 1af2a3d320c9ec7b2f8aaae33214a753a5def11c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.scripting.jruby;
import java.io.IOException;
import java.util.Map;
import org.apache.pig.AlgebraicEvalFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.scripting.jruby.JrubyScriptEngine.RubyFunctions;
import org.jruby.Ruby;
import org.jruby.embed.ScriptingContainer;
import org.jruby.runtime.builtin.IRubyObject;
/**
* This class provides the bridge between Ruby classes that extend the AlgebraicPigUdf
* "interface" by implementing an initial, intermed, and final method. Unlike EvalFuncs
* and Accumulators, the type must be known at compile time (ie it can't return Object),
* as Pig inspects the type and ensures that it is valid. This is why class specific
* shells are provided at the bottom. This class leverages AlgebraicEvalFunc to provide
* the Accumulator and EvalFunc implementations.
*/
public abstract class JrubyAlgebraicEvalFunc<T> extends AlgebraicEvalFunc<T> {
protected static BagFactory mBagFactory = BagFactory.getInstance();
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
protected static final ScriptingContainer rubyEngine = JrubyScriptEngine.rubyEngine;
protected static final Ruby ruby = rubyEngine.getProvider().getRuntime();
// It makes no sense to instantiate this without arguments
private JrubyAlgebraicEvalFunc() {}
/**
* The constructor takes information on the script and method being invoked and registers it with the
* superclass (which is necessary for AlgebraicEvalFunc).
*/
public JrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName, functionName); }
/**
* This class invokes the initial method on the given Ruby class. As a courtesy to the user, it unwraps the
* DataBag that is given to Initial and passes Ruby just the first Tuple that it contains, as the contract
* for the Initial function in Algebraic is that it is given a DataBag with one and only one Tuple. Finally,
* it wraps the Ruby output in a Tuple.
*/
public static class Initial extends AlgebraicFunctionWrapper<Tuple> {
public Initial() {}
public Initial(String fileName, String functionName) { super(fileName, functionName, "initial"); }
@Override
public Tuple exec(Tuple input) throws IOException {
if (!isInitialized())
initialize();
try {
IRubyObject inp = PigJrubyLibrary.pigToRuby(ruby, ((DataBag)input.get(0)).iterator().next().get(0));
IRubyObject rubyResult = rubyEngine.callMethod(getReceiver(), getStage(), inp, IRubyObject.class);
return mTupleFactory.newTuple(PigJrubyLibrary.rubyToPig(rubyResult));
} catch (Exception e) {
throw new IOException("Error executing initial function", e);
}
}
}
/**
* This class invokes the intermed method on the given Ruby class. It passes along the DataBag contained
* in the Tuple it is given, and wraps the Ruby output in a Tuple.
*/
public static class Intermed extends AlgebraicFunctionWrapper<Tuple> {
public Intermed() {}
public Intermed(String fileName, String functionName) { super(fileName, functionName, "intermed"); }
@Override
public Tuple exec(Tuple input) throws IOException {
if (!isInitialized())
initialize();
try {
RubyDataBag inp = new RubyDataBag(ruby, ruby.getClass("DataBag"), (DataBag)input.get(0));
IRubyObject rubyResult = rubyEngine.callMethod(getReceiver(), getStage(), inp, IRubyObject.class);
return mTupleFactory.newTuple(PigJrubyLibrary.rubyToPig(rubyResult));
} catch (Exception e) {
throw new IOException("Error executing intermediate function: ", e);
}
}
}
/**
* This class invokes the final method on the given Ruby class. It passes along the DataBag contained
* in the Tuple it is given, and the raw result.
*/
public static class Final<T> extends AlgebraicFunctionWrapper<T> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName, functionName, "final"); }
@SuppressWarnings("unchecked")
@Override
public T exec(Tuple input) throws IOException {
if (!isInitialized())
initialize();
try {
RubyDataBag inp = new RubyDataBag(ruby, ruby.getClass("DataBag"), (DataBag)input.get(0));
IRubyObject rubyResult = rubyEngine.callMethod(getReceiver(), getStage(), inp, IRubyObject.class);
return (T)PigJrubyLibrary.rubyToPig(rubyResult);
} catch (Exception e) {
throw new IOException("Error executing final function", e);
}
}
}
/**
* This is a lightweight wrapper shell that registers information on the method being called,
* and provides the initializer that the static Algebraic classes (Initial, Intermed, Final)
* will use to execute.
*/
public static abstract class AlgebraicFunctionWrapper<T> extends EvalFunc<T> {
private String fileName;
private String functionName;
protected Object receiver;
protected boolean isInitialized = false;
protected String stage;
public String getStage() { return stage; }
public Object getReceiver() { return receiver; }
public String getFileName() { return fileName; }
public String getFunctionName() { return functionName; }
public AlgebraicFunctionWrapper() {}
/**
* In addition to registering the fileName and the functionName (which are given based on the
* arguments passed to super() in the containing class's constructor, each extending class
* will register their "stage," which will serve as the method to invoke on the Ruby class.
*/
public AlgebraicFunctionWrapper(String fileName, String functionName, String stage) {
this.fileName = fileName;
this.functionName = functionName;
this.stage = stage;
}
public boolean isInitialized() { return isInitialized; }
public void initialize() {
receiver = rubyEngine.callMethod(RubyFunctions.getFunctions("algebraic", fileName).get(functionName), "new");
isInitialized = true;
}
@Override
public abstract T exec(Tuple input) throws IOException;
}
@Override
public abstract String getFinal();
@Override
public String getInitial() { return Initial.class.getName(); }
@Override
public String getIntermed() { return Intermed.class.getName(); }
/**
* Unlike EvalFuncs and Accumulators, the type must be known at compile time (ie it
* can't return Object), as Pig inspects the type and ensures that it is valid. This
* is why class specific shells are provided here. This is also the reason why the
* Ruby Algebraic interface is the only interface that does not currently allow overriding
* outputSchema, and a fixed one must be provided.
*/
public static class BagJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<DataBag> {
public BagJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
@Override
public String getFinal() { return Final.class.getName(); }
public static class Final extends JrubyAlgebraicEvalFunc.Final<DataBag> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName,functionName); }
}
}
public static class ChararrayJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<String> {
public ChararrayJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
@Override
public String getFinal() { return Final.class.getName(); }
public static class Final extends JrubyAlgebraicEvalFunc.Final<String> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName,functionName); }
}
}
public static class DataByteArrayJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<DataByteArray> {
public DataByteArrayJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
@Override
public String getFinal() { return Final.class.getName(); }
public static class Final extends JrubyAlgebraicEvalFunc.Final<DataByteArray> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName,functionName); }
}
}
public static class DoubleJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Double> {
public DoubleJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
@Override
public String getFinal() { return Final.class.getName(); }
public static class Final extends JrubyAlgebraicEvalFunc.Final<Double> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName,functionName); }
}
}
public static class FloatJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Float> {
public FloatJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
@Override
public String getFinal() { return Final.class.getName(); }
public static class Final extends JrubyAlgebraicEvalFunc.Final<Float> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName,functionName); }
}
}
public static class IntegerJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Integer> {
public IntegerJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
@Override
public String getFinal() { return Final.class.getName(); }
public static class Final extends JrubyAlgebraicEvalFunc.Final<Integer> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName,functionName); }
}
}
public static class LongJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Long> {
public LongJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
@Override
public String getFinal() { return Final.class.getName(); }
public static class Final extends JrubyAlgebraicEvalFunc.Final<Long> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName,functionName); }
}
}
public static class MapJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Map<?,?>> {
public MapJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
@Override
public String getFinal() { return Final.class.getName(); }
public static class Final extends JrubyAlgebraicEvalFunc.Final<Map<?,?>> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName,functionName); }
}
}
public static class TupleJrubyAlgebraicEvalFunc extends JrubyAlgebraicEvalFunc<Tuple> {
public TupleJrubyAlgebraicEvalFunc(String fileName, String functionName) { super(fileName,functionName); }
@Override
public String getFinal() { return Final.class.getName(); }
public static class Final extends JrubyAlgebraicEvalFunc.Final<Tuple> {
public Final() {}
public Final(String fileName, String functionName) { super(fileName,functionName); }
}
}
}