blob: af548a4e1334cbe1a60ad1d0e2e3d12832f3a8aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.parser.SourceLocation;
import org.apache.pig.pen.Illustrable;
import org.apache.pig.pen.Illustrator;
import org.apache.pig.pen.util.LineageTracer;
/**
*
* This is the base class for all operators. This supports a generic way of
* processing inputs which can be overridden by operators extending this class.
* The input model assumes that it can either be taken from an operator or can
* be attached directly to this operator. Also it is assumed that inputs to an
* operator are always in the form of a tuple.
*
* For this pipeline rework, we assume a pull based model, i.e, the root
* operator is going to call getNext with the appropriate type which initiates a
* cascade of getNext calls that unroll to create input for the root operator to
* work on.
*
* Any operator that extends the PhysicalOperator, supports a getNext with all
* the different types of parameter types. The concrete implementation should
* use the result type of its input operator to decide the type of getNext's
* parameter. This is done to avoid switch/case based on the type as much as
* possible. The default is assumed to return an erroneus Result corresponding
* to an unsupported operation on that type. So the operators need to implement
* only those types that are supported.
*
*/
public abstract class PhysicalOperator extends Operator<PhyPlanVisitor> implements Illustrable, Cloneable {
private static final Log log = LogFactory.getLog(PhysicalOperator.class);
protected static final long serialVersionUID = 1L;
protected static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null);
protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null);
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
protected static final BagFactory mBagFactory = BagFactory.getInstance();
// The degree of parallelism requested
protected int requestedParallelism;
// The inputs that this operator will read data from
protected List<PhysicalOperator> inputs;
// The outputs that this operator will write data to
// Will be used to create Targeted tuples
protected List<PhysicalOperator> outputs;
// The data type for the results of this operator
protected byte resultType = DataType.TUPLE;
// The physical plan this operator is part of
protected PhysicalPlan parentPlan;
// Specifies if the input has been directly attached
protected boolean inputAttached = false;
// If inputAttached is true, input is set to the input tuple
protected Tuple input = null;
// The result of performing the operation along with the output
protected Result res = null;
// alias associated with this PhysicalOperator
protected String alias = null;
// Will be used by operators to report status or transmit heartbeat
// Should be set by the backends to appropriate implementations that
// wrap their own version of a reporter.
protected static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
// Will be used by operators to aggregate warning messages
// Should be set by the backends to appropriate implementations that
// wrap their own version of a logger.
protected static PigLogger pigLogger;
// TODO: This is not needed. But a lot of tests check serialized physical plans
// that are sensitive to the serialized image of the contained physical operators.
// So for now, just keep it. Later it'll be cleansed along with those test golden
// files
protected LineageTracer lineageTracer;
protected transient Illustrator illustrator = null;
private boolean accum;
private transient boolean accumStart;
private List<OriginalLocation> originalLocations = new ArrayList<OriginalLocation>();
public PhysicalOperator(OperatorKey k) {
this(k, -1, null);
}
public PhysicalOperator(OperatorKey k, int rp) {
this(k, rp, null);
}
public PhysicalOperator(OperatorKey k, List<PhysicalOperator> inp) {
this(k, -1, inp);
}
public PhysicalOperator(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k);
requestedParallelism = rp;
inputs = inp;
res = new Result();
}
public PhysicalOperator(PhysicalOperator copy) {
super (copy.getOperatorKey());
this.res = new Result();
this.requestedParallelism = copy.requestedParallelism;
this.inputs = copy.inputs;
this.outputs = copy.outputs;
this.resultType = copy.resultType;
this.parentPlan = copy.parentPlan;
this.inputAttached = copy.inputAttached;
this.alias = copy.alias;
this.lineageTracer = copy.lineageTracer;
this.accum = copy.accum;
this.originalLocations = copy.originalLocations;
}
@Override
public void setIllustrator(Illustrator illustrator) {
this.illustrator = illustrator;
}
public Illustrator getIllustrator() {
return illustrator;
}
public int getRequestedParallelism() {
return requestedParallelism;
}
public void setRequestedParallelism(int requestedParallelism) {
this.requestedParallelism = requestedParallelism;
}
public byte getResultType() {
return resultType;
}
public String getAlias() {
return alias;
}
protected String getAliasString() {
return (alias == null) ? "" : (alias + ": ");
}
public void copyAliasFrom(PhysicalOperator op) {
this.alias = op.alias;
this.originalLocations = op.originalLocations;
}
public void addOriginalLocation(String alias, SourceLocation sourceLocation) {
this.alias = alias;
this.originalLocations.add(new OriginalLocation(alias, sourceLocation.line(), sourceLocation.offset()));
}
public void addOriginalLocation(String alias, List<OriginalLocation> originalLocations) {
this.alias = alias;
this.originalLocations.addAll(originalLocations);
}
public List<OriginalLocation> getOriginalLocations() {
return Collections.unmodifiableList(originalLocations);
}
public void setAccumulative() {
accum = true;
}
public boolean isAccumulative() {
return accum;
}
public void setAccumStart() {
if (!accum) {
throw new IllegalStateException("Accumulative is not turned on.");
}
accumStart = true;
}
public boolean isAccumStarted() {
return accumStart;
}
public void setAccumEnd() {
if (!accum){
throw new IllegalStateException("Accumulative is not turned on.");
}
accumStart = false;
}
public void setResultType(byte resultType) {
this.resultType = resultType;
}
public List<PhysicalOperator> getInputs() {
return inputs;
}
public void setInputs(List<PhysicalOperator> inputs) {
this.inputs = inputs;
}
public boolean isInputAttached() {
return inputAttached;
}
/**
* Shorts the input path of this operator by providing the input tuple
* directly
*
* @param t -
* The tuple that should be used as input
*/
public void attachInput(Tuple t) {
input = t;
this.inputAttached = true;
}
/**
* Detaches any tuples that are attached
*
*/
public void detachInput() {
input = null;
this.inputAttached = false;
}
/**
* A blocking operator should override this to return true. Blocking
* operators are those that need the full bag before operate on the tuples
* inside the bag. Example is the Global Rearrange. Non-blocking or pipeline
* operators are those that work on a tuple by tuple basis.
*
* @return true if blocking and false otherwise
*/
public boolean isBlocking() {
return false;
}
/**
* A generic method for parsing input that either returns the attached input
* if it exists or fetches it from its predecessor. If special processing is
* required, this method should be overridden.
*
* @return The Result object that results from processing the input
* @throws ExecException
*/
public Result processInput() throws ExecException {
try {
if (input == null && (inputs == null || inputs.size() == 0)) {
// log.warn("No inputs found. Signaling End of Processing.");
return RESULT_EOP;
}
// Should be removed once the model is clear
PigProgressable progRep = getReporter();
if (progRep != null) {
progRep.progress();
}
if (!isInputAttached()) {
return inputs.get(0).getNextTuple();
} else {
Result res = new Result();
res.result = input;
res.returnStatus = POStatus.STATUS_OK;
detachInput();
return res;
}
} catch (ExecException e) {
throw new ExecException("Exception while executing "
+ this.toString() + ": " + e.toString(), e);
}
}
@Override
public abstract void visit(PhyPlanVisitor v) throws VisitorException;
/**
* Implementations that call into the different versions of getNext are often
* identical, differing only in the signature of the getNext() call they make.
* This method allows to cut down on some of the copy-and-paste.
* @param dataType Describes the type of obj; a byte from DataType.
*
* @return result Result of applying this Operator to the Object.
* @throws ExecException
*/
public Result getNext(byte dataType) throws ExecException {
try {
switch (dataType) {
case DataType.BAG:
return getNextDataBag();
case DataType.BOOLEAN:
return getNextBoolean();
case DataType.BYTEARRAY:
return getNextDataByteArray();
case DataType.CHARARRAY:
return getNextString();
case DataType.DOUBLE:
return getNextDouble();
case DataType.FLOAT:
return getNextFloat();
case DataType.INTEGER:
return getNextInteger();
case DataType.LONG:
return getNextLong();
case DataType.BIGINTEGER:
return getNextBigInteger();
case DataType.BIGDECIMAL:
return getNextBigDecimal();
case DataType.DATETIME:
return getNextDateTime();
case DataType.MAP:
return getNextMap();
case DataType.TUPLE:
return getNextTuple();
default:
throw new ExecException("Unsupported type for getNext: " + DataType.findTypeName(dataType));
}
} catch (ExecException e) {
throw new ExecException("Exception while executing for " + originalLocationsToDescriptiveString()
+ ": " + e.toString(), e.getErrorCode(), e);
} catch (RuntimeException e) {
throw new ExecException("Exception while executing " + this.toString() + ": " + e.toString(), e);
}
}
private String originalLocationsToDescriptiveString() {
StringBuilder sb = new StringBuilder();
sb.append('[');
boolean isFirstItem = true;
for( OriginalLocation ol : getOriginalLocations() ) {
if( isFirstItem ) {
isFirstItem = false;
} else {
sb.append(",");
}
sb.append(ol.getAlias()).append("[line=").append(ol.getLine())
.append(",offset=").append(ol.getOffset()).append("]");
}
return sb.append(']').toString();
}
public Result getNextInteger() throws ExecException {
return res;
}
public Result getNextLong() throws ExecException {
return res;
}
public Result getNextDouble() throws ExecException {
return res;
}
public Result getNextFloat() throws ExecException {
return res;
}
public Result getNextDateTime() throws ExecException {
return res;
}
public Result getNextString() throws ExecException {
return res;
}
public Result getNextDataByteArray() throws ExecException {
return res;
}
public Result getNextMap() throws ExecException {
return res;
}
public Result getNextBoolean() throws ExecException {
return res;
}
public Result getNextTuple() throws ExecException {
return res;
}
public Result getNextDataBag() throws ExecException {
Result val = new Result();
DataBag tmpBag = mBagFactory.newDefaultBag();
for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
if (ret.returnStatus == POStatus.STATUS_ERR) {
return ret;
} else if (ret.returnStatus == POStatus.STATUS_NULL) {
continue;
} else {
tmpBag.add((Tuple) ret.result);
}
}
val.result = tmpBag;
val.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
return val;
}
public Result getNextBigInteger() throws ExecException {
return res;
}
public Result getNextBigDecimal() throws ExecException {
return res;
}
/**
* Reset internal state in an operator. For use in nested pipelines
* where operators like limit and sort may need to reset their state.
* Limit needs it because it needs to know it's seeing a fresh set of
* input. Blocking operators like sort and distinct need it because they
* may not have drained their previous input due to a limit and thus need
* to be told to drop their old input and start over.
*/
public void reset() {
}
public boolean isEndOfAllInput() {
return parentPlan.endOfAllInput;
}
/**
* @return PigProgressable stored in threadlocal
*/
public static PigProgressable getReporter() {
return PhysicalOperator.reporter.get();
}
/**
* @param reporter PigProgressable to be stored in threadlocal
*/
public static void setReporter(PigProgressable reporter) {
PhysicalOperator.reporter.set(reporter);
}
//@StaticDataCleanup
public static void staticDataCleanup() {
reporter = new ThreadLocal<PigProgressable>();
}
/**
* Make a copy of this operator. This function is blank, however,
* we should leave a place holder so that the subclasses can clone
* to make deep copy as this one creates a shallow copy of
* non-primitive types (objects, arrays and lists)
*
* @throws CloneNotSupportedException
*/
@Override
public PhysicalOperator clone() throws CloneNotSupportedException {
return (PhysicalOperator)super.clone();
}
protected void cloneHelper(PhysicalOperator op) {
resultType = op.resultType;
originalLocations.addAll(op.originalLocations);
}
protected static List<PhysicalPlan> clonePlans(List<PhysicalPlan> origPlans) throws CloneNotSupportedException {
List<PhysicalPlan> clonePlans = new ArrayList<PhysicalPlan>(origPlans.size());
for (PhysicalPlan plan : origPlans) {
clonePlans.add(plan.clone());
}
return clonePlans;
}
/**
* @param physicalPlan
*/
public void setParentPlan(PhysicalPlan physicalPlan) {
parentPlan = physicalPlan;
}
public PhysicalPlan getParentPlan() {
return parentPlan;
}
public Log getLogger() {
return log;
}
public static void setPigLogger(PigLogger logger) {
pigLogger = logger;
}
public static PigLogger getPigLogger() {
return pigLogger;
}
public static class OriginalLocation implements Serializable {
private String alias;
private int line;
private int offset;
public OriginalLocation(String alias, int line, int offset) {
super();
this.alias = alias;
this.line = line;
this.offset = offset;
}
public String getAlias() {
return alias;
}
public int getLine() {
return line;
}
public int getOffset() {
return offset;
}
@Override
public String toString() {
return alias+"["+line+","+offset+"]";
}
}
}