blob: 7ccd3934c784739b9f04c3db6e9be13e1493d5eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE;
import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE_FREQUENCY;
import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.TerminatingAccumulator;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
import org.apache.pig.builtin.MonitoredUDF;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.SchemaTupleFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.TupleMaker;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
public class POUserFunc extends ExpressionOperator {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(POUserFunc.class);
private static final TupleFactory tf = TupleFactory.getInstance();
private transient String counterGroup;
private transient EvalFunc func;
private transient List<String> cacheFiles = null;
private transient List<String> shipFiles = null;
private transient Credentials creds = null;
FuncSpec funcSpec;
FuncSpec origFSpec;
public static final byte INITIAL = 0;
public static final byte INTERMEDIATE = 1;
public static final byte FINAL = 2;
private boolean initialized = false;
private MonitoredUDFExecutor executor = null;
private PhysicalOperator referencedOperator = null;
private boolean isAccumulationDone;
private String signature;
private boolean haveCheckedIfTerminatingAccumulator;
private long numInvocations = 0L;
private long timingFrequency = 100L;
private boolean doTiming = false;
public PhysicalOperator getReferencedOperator() {
return referencedOperator;
}
public void setReferencedOperator(PhysicalOperator referencedOperator) {
this.referencedOperator = referencedOperator;
}
public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
this(k, rp, inp, null);
}
public POUserFunc(
OperatorKey k,
int rp,
List<PhysicalOperator> inp,
FuncSpec funcSpec) {
this(k, rp, inp, funcSpec, null);
}
public POUserFunc(
OperatorKey k,
int rp,
List<PhysicalOperator> inp,
FuncSpec funcSpec,
EvalFunc func) {
super(k, rp);
super.setInputs(inp);
this.funcSpec = funcSpec;
this.origFSpec = funcSpec;
this.func = func;
instantiateFunc(funcSpec);
}
public void setFuncInputSchema(){
setFuncInputSchema(signature);
}
private void instantiateFunc(FuncSpec fSpec) {
this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
this.setSignature(signature);
this.setFuncInputSchema(signature);
if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
executor = new MonitoredUDFExecutor(func);
}
//the next couple of initializations do not work as intended for the following reasons
//the reporter and pigLogger are member variables of PhysicalOperator
//when instanitateFunc is invoked at deserialization time, both
//reporter and pigLogger are null. They are set during map and reduce calls,
//making the initializations here basically useless. Look at the processInput
//method where these variables are re-initialized. At that point, the PhysicalOperator
//is set up correctly with the reporter and pigLogger references
this.func.setReporter(getReporter());
this.func.setPigLogger(pigLogger);
}
private transient TupleMaker inputTupleMaker;
private boolean usingSchemaTupleFactory;
@Override
public Result processInput() throws ExecException {
// Make sure the reporter is set, because it isn't getting carried
// across in the serialization (don't know why). I suspect it's as
// cheap to call the setReporter call everytime as to check whether I
// have (hopefully java will inline it).
if(!initialized) {
func.setReporter(getReporter());
func.setPigLogger(pigLogger);
Configuration jobConf = UDFContext.getUDFContext().getJobConf();
if (jobConf != null) {
doTiming = jobConf.getBoolean(PIG_UDF_PROFILE, false);
if (doTiming) {
counterGroup = funcSpec.toString();
timingFrequency = jobConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L);
}
}
// We initialize here instead of instantiateFunc because this is called
// when actual processing has begun, whereas a function can be instantiated
// on the frontend potentially (mainly for optimization)
Schema tmpS = func.getInputSchema();
if (tmpS != null) {
//Currently, getInstanceForSchema returns null if no class was found. This works fine...
//if it is null, the default will be used. We pass the context because if it happens that
//the same Schema was generated elsewhere, we do not want to override user expectations
inputTupleMaker = SchemaTupleFactory.getInstance(tmpS, false, GenContext.UDF);
if (inputTupleMaker == null) {
LOG.debug("No SchemaTupleFactory found for Schema ["+tmpS+"], using default TupleFactory");
usingSchemaTupleFactory = false;
} else {
LOG.debug("Using SchemaTupleFactory for Schema: " + tmpS);
usingSchemaTupleFactory = true;
}
//In the future, we could optionally use SchemaTuples for output as well
}
if (inputTupleMaker == null) {
inputTupleMaker = TupleFactory.getInstance();
}
initialized = true;
}
Result res = new Result();
if (input == null && (inputs == null || inputs.size()==0)) {
res.returnStatus = POStatus.STATUS_EOP;
return res;
}
//Should be removed once the model is clear
if(getReporter()!=null) {
getReporter().progress();
}
if(isInputAttached()) {
res.result = input;
res.returnStatus = POStatus.STATUS_OK;
detachInput();
return res;
} else {
//we decouple this because there may be cases where the size is known and it isn't a schema
// tuple factory
boolean knownSize = usingSchemaTupleFactory;
int knownIndex = 0;
res.result = inputTupleMaker.newTuple();
Result temp = null;
for(PhysicalOperator op : inputs) {
temp = op.getNext(op.getResultType());
if(temp.returnStatus!=POStatus.STATUS_OK) {
return temp;
}
if(op instanceof POProject &&
op.getResultType() == DataType.TUPLE){
POProject projOp = (POProject)op;
if(projOp.isProjectToEnd()){
Tuple trslt = (Tuple) temp.result;
Tuple rslt = (Tuple) res.result;
for(int i=0;i<trslt.size();i++) {
if (knownSize) {
rslt.set(knownIndex++, trslt.get(i));
} else {
rslt.append(trslt.get(i));
}
}
continue;
}
}
if (knownSize) {
((Tuple)res.result).set(knownIndex++, temp.result);
} else {
((Tuple)res.result).append(temp.result);
}
}
res.returnStatus = temp.returnStatus;
return res;
}
}
private boolean isEarlyTerminating = false;
private void setIsEarlyTerminating() {
isEarlyTerminating = true;
}
private boolean isEarlyTerminating() {
return isEarlyTerminating;
}
private boolean isTerminated = false;
private boolean hasBeenTerminated() {
return isTerminated;
}
private void earlyTerminate() {
isTerminated = true;
}
private Result getNext() throws ExecException {
Result result = processInput();
long startNanos = 0;
boolean timeThis = doTiming && (numInvocations++ % timingFrequency == 0);
if (timeThis) {
startNanos = System.nanoTime();
PigStatusReporter.getInstance().incrCounter(counterGroup, TIME_UDFS_INVOCATION_COUNTER, timingFrequency);
}
try {
if(result.returnStatus == POStatus.STATUS_OK) {
if (isAccumulative()) {
if (isAccumStarted()) {
if (!haveCheckedIfTerminatingAccumulator) {
haveCheckedIfTerminatingAccumulator = true;
if (func instanceof TerminatingAccumulator<?>)
setIsEarlyTerminating();
}
if (!hasBeenTerminated() && isEarlyTerminating() && ((TerminatingAccumulator<?>)func).isFinished()) {
earlyTerminate();
}
if (hasBeenTerminated()) {
result.returnStatus = POStatus.STATUS_EARLY_TERMINATION;
result.result = null;
isAccumulationDone = false;
} else {
((Accumulator)func).accumulate((Tuple)result.result);
result.returnStatus = POStatus.STATUS_BATCH_OK;
result.result = null;
isAccumulationDone = false;
}
}else{
if(isAccumulationDone){
//PORelationToExprProject does not return STATUS_EOP
// so that udf gets called both when isAccumStarted
// is first true and then set to false, even
//when the input relation is empty.
// so the STATUS_EOP has to be sent from POUserFunc,
// after the results have been sent.
result.result = null;
result.returnStatus = POStatus.STATUS_EOP;
}
else{
result.result = ((Accumulator)func).getValue();
result.returnStatus = POStatus.STATUS_OK;
((Accumulator)func).cleanup();
isAccumulationDone = true;
}
}
} else {
if (parentPlan!=null && parentPlan.endOfAllInput && needEndOfAllInputProcessing()) {
func.setEndOfAllInput(true);
}
if (executor != null) {
result.result = executor.monitorExec((Tuple) result.result);
} else {
result.result = func.exec((Tuple) result.result);
}
}
}
if (timeThis) {
PigStatusReporter.getInstance().incrCounter(counterGroup, TIME_UDFS_ELAPSED_TIME_COUNTER,
Math.round((System.nanoTime() - startNanos) / 1000) * timingFrequency);
}
return result;
} catch (ExecException ee) {
throw ee;
} catch (IOException ioe) {
int errCode = 2078;
String msg = "Caught error from UDF: " + funcSpec.getClassName();
String footer = " [" + ioe.getMessage() + "]";
if(ioe instanceof PigException) {
int udfErrorCode = ((PigException)ioe).getErrorCode();
if(udfErrorCode != 0) {
errCode = udfErrorCode;
msg = ((PigException)ioe).getMessage();
} else {
msg += " [" + ((PigException)ioe).getMessage() + " ]";
}
} else {
msg += footer;
}
throw new ExecException(msg, errCode, PigException.BUG, ioe);
} catch (IndexOutOfBoundsException ie) {
int errCode = 2078;
String msg = "Caught error from UDF: " + funcSpec.getClassName() +
", Out of bounds access [" + ie.getMessage() + "]";
throw new ExecException(msg, errCode, PigException.BUG, ie);
}
}
@Override
public Result getNextTuple() throws ExecException {
return getNext();
}
@Override
public Result getNextDataBag() throws ExecException {
return getNext();
}
@Override
public Result getNextInteger() throws ExecException {
return getNext();
}
@Override
public Result getNextBoolean() throws ExecException {
return getNext();
}
@Override
public Result getNextDataByteArray() throws ExecException {
return getNext();
}
@Override
public Result getNextDouble() throws ExecException {
return getNext();
}
@Override
public Result getNextBigInteger() throws ExecException {
return getNext();
}
@Override
public Result getNextBigDecimal() throws ExecException {
return getNext();
}
@Override
public Result getNextFloat() throws ExecException {
return getNext();
}
@Override
public Result getNextLong() throws ExecException {
return getNext();
}
@Override
public Result getNextDateTime() throws ExecException {
return getNext();
}
@Override
public Result getNextMap() throws ExecException {
return getNext();
}
@Override
public Result getNextString() throws ExecException {
return getNext();
}
public void setAlgebraicFunction(byte Function) throws ExecException {
// This will only be used by the optimizer for putting correct functions
// in the mapper,
// combiner and reduce. This helps in maintaining the physical plan as
// is without the
// optimiser having to replace any operators.
// You wouldn't be able to make two calls to this function on the same
// algebraic EvalFunc as
// func is being changed.
switch (Function) {
case INITIAL:
funcSpec = new FuncSpec(getInitial());
break;
case INTERMEDIATE:
funcSpec = new FuncSpec(getIntermed());
break;
case FINAL:
funcSpec = new FuncSpec(getFinal());
break;
}
funcSpec.setCtorArgs(origFSpec.getCtorArgs());
instantiateFunc(funcSpec);
setResultType(DataType.findType(((EvalFunc<?>) func).getReturnType()));
}
public String getInitial() throws ExecException {
instantiateFunc(origFSpec);
if (func instanceof Algebraic) {
return ((Algebraic) func).getInitial();
} else {
int errCode = 2072;
String msg = "Attempt to run a non-algebraic function"
+ " as an algebraic function";
throw new ExecException(msg, errCode, PigException.BUG);
}
}
public String getIntermed() throws ExecException {
instantiateFunc(origFSpec);
if (func instanceof Algebraic) {
return ((Algebraic) func).getIntermed();
} else {
int errCode = 2072;
String msg = "Attempt to run a non-algebraic function"
+ " as an algebraic function";
throw new ExecException(msg, errCode, PigException.BUG);
}
}
public String getFinal() throws ExecException {
instantiateFunc(origFSpec);
if (func instanceof Algebraic) {
return ((Algebraic) func).getFinal();
} else {
int errCode = 2072;
String msg = "Attempt to run a non-algebraic function"
+ " as an algebraic function";
throw new ExecException(msg, errCode, PigException.BUG);
}
}
public Type getOriginalReturnType() throws ExecException {
instantiateFunc(origFSpec);
return func.getReturnType();
}
public Type getReturnType() {
return func.getReturnType();
}
public void finish() {
func.finish();
if (executor != null) {
executor.terminate();
}
}
public Schema outputSchema(Schema input) {
return func.outputSchema(input);
}
public Boolean isAsynchronous() {
return func.isAsynchronous();
}
@Override
public String name() {
return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
}
@Override
public boolean supportsMultipleInputs() {
return true;
}
@Override
public boolean supportsMultipleOutputs() {
return false;
}
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitUserFunc(this);
}
public FuncSpec getFuncSpec() {
return funcSpec;
}
public void setFuncSpec(FuncSpec funcSpec) {
this.funcSpec = funcSpec;
instantiateFunc(funcSpec);
}
public List<String> getCacheFiles() {
return cacheFiles;
}
public void setCacheFiles(List<String> cf) {
cacheFiles = cf;
}
public List<String> getShipFiles() {
return shipFiles;
}
public void setShipFiles(List<String> sf) {
shipFiles = sf;
}
public boolean combinable() {
return (func instanceof Algebraic);
}
@Override
public POUserFunc clone() throws CloneNotSupportedException {
// Inputs will be patched up later by PhysicalPlan.clone()
POUserFunc clone = new POUserFunc(new OperatorKey(mKey.scope,
NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
requestedParallelism, null, funcSpec.clone());
clone.setResultType(resultType);
clone.signature = signature;
clone.cacheFiles = cacheFiles;
clone.shipFiles = shipFiles;
return clone;
}
private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
is.defaultReadObject();
instantiateFunc(funcSpec);
}
/**
* Get child expression of this expression
*/
@Override
public List<ExpressionOperator> getChildExpressions() {
return null;
}
@SuppressWarnings("unchecked")
@Override
public void setAccumStart() {
if (isAccumulative() && !isAccumStarted()) {
super.setAccumStart();
((Accumulator)func).cleanup();
}
}
@Override
public void setResultType(byte resultType) {
this.resultType = resultType;
}
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return (Tuple) out;
}
public EvalFunc getFunc() {
return func;
}
public String getSignature() {
return signature;
}
public void setSignature(String signature) {
this.signature = signature;
if (this.func!=null) {
this.func.setUDFContextSignature(signature);
}
}
/**
* Sets EvalFunc's inputschema based on the signature
* @param signature
*/
public void setFuncInputSchema(String signature) {
Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
if(tmpS!=null) {
this.func.setInputSchema(tmpS);
}
}
public boolean needEndOfAllInputProcessing() {
return getFunc().needEndOfAllInputProcessing();
}
public Credentials getCredentials() {
return this.creds;
}
public void setCredentials(Credentials creds) {
this.creds = creds;
}
}