blob: e9fa5a61042d919e90a8da6fd669b30b2ec77786 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFEndOfAllInputNeededVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.SchemaTupleFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.TupleMaker;
import org.apache.pig.data.UnlimitedNullTuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.ExampleTuple;
import org.apache.pig.pen.util.LineageTracer;
//We intentionally skip type checking in backend for performance reasons
@SuppressWarnings("unchecked")
public class POForEach extends PhysicalOperator {
private static final long serialVersionUID = 1L;
private static final Result UNLIMITED_NULL_RESULT = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
protected List<PhysicalPlan> inputPlans;
protected List<PhysicalOperator> opsToBeReset;
protected PhysicalOperator[] planLeafOps;
// store result types of the plan leaves
protected byte[] resultTypes;
// array version of isToBeFlattened - this is purely
// for optimization - instead of calling isToBeFlattened.get(i)
// we can do the quicker array access - isToBeFlattenedArray[i].
// Also we can store "boolean" values rather than "Boolean" objects
// so we can also save on the Boolean.booleanValue() calls
protected boolean[] isToBeFlattenedArray;
private int[] flattenNumFields = null;
protected int noItems;
//Since the plan has a generate, this needs to be maintained
//as the generate can potentially return multiple tuples for
//same call.
protected transient boolean processingPlan;
//its holds the iterators of the databags given by the input expressions which need flattening.
protected transient Iterator<Tuple> [] its = null;
//This holds the outputs given out by the input expressions of any datatype
protected transient Object[] bags ;
//This is the template whcih contains tuples and is flattened out in createTuple() to generate the final output
protected transient Object[] data;
// store whether or not an accumulative UDF has terminated early
protected transient BitSet earlyTermination;
protected transient ExampleTuple tIn;
protected transient AccumulativeTupleBuffer buffer;
protected transient Tuple inpTuple;
protected transient boolean endOfAllInputProcessed;
// Indicate the foreach statement can only in map side
// Currently only used in MR cross (See PIG-4175)
protected boolean mapSideOnly = false;
protected Boolean endOfAllInputProcessing = false;
private Schema schema;
public POForEach(OperatorKey k) {
this(k,-1,null,null);
}
public POForEach(OperatorKey k, int rp) {
this(k,rp,null,null);
}
public POForEach(OperatorKey k, List inp) {
this(k,-1,inp,null);
}
public POForEach(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened){
super(k, rp);
setUpFlattens(isToBeFlattened);
this.inputPlans = inp;
opsToBeReset = new ArrayList<PhysicalOperator>();
getLeaves();
}
public POForEach(OperatorKey operatorKey, int requestedParallelism,
List<PhysicalPlan> innerPlans, List<Boolean> flattenList, Schema schema) {
this(operatorKey, requestedParallelism, innerPlans, flattenList);
this.schema = schema;
}
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitPOForEach(this);
}
@Override
public String name() {
return getAliasString() + "New For Each" + "(" + getFlatStr() + ")" + "["
+ DataType.findTypeName(resultType) + "]" + " - "
+ mKey.toString();
}
String getFlatStr() {
if(isToBeFlattenedArray ==null) {
return "";
}
StringBuilder sb = new StringBuilder();
for (Boolean b : isToBeFlattenedArray) {
sb.append(b);
sb.append(',');
}
if(sb.length()>0){
sb.deleteCharAt(sb.length()-1);
}
return sb.toString();
}
@Override
public boolean supportsMultipleInputs() {
return false;
}
@Override
public boolean supportsMultipleOutputs() {
return false;
}
@Override
public void setAccumulative() {
super.setAccumulative();
for(PhysicalPlan p : inputPlans) {
Iterator<PhysicalOperator> iter = p.iterator();
while(iter.hasNext()) {
PhysicalOperator po = iter.next();
if (po instanceof ExpressionOperator || po instanceof PODistinct) {
po.setAccumulative();
}
}
}
}
@Override
public void setAccumStart() {
super.setAccumStart();
for(PhysicalPlan p : inputPlans) {
Iterator<PhysicalOperator> iter = p.iterator();
while(iter.hasNext()) {
PhysicalOperator po = iter.next();
if (po instanceof ExpressionOperator || po instanceof PODistinct) {
po.setAccumStart();
}
}
}
}
@Override
public void setAccumEnd() {
super.setAccumEnd();
for(PhysicalPlan p : inputPlans) {
Iterator<PhysicalOperator> iter = p.iterator();
while(iter.hasNext()) {
PhysicalOperator po = iter.next();
if (po instanceof ExpressionOperator || po instanceof PODistinct) {
po.setAccumEnd();
}
}
}
}
/**
* Calls getNext on the generate operator inside the nested
* physical plan and returns it maintaining an additional state
* to denote the begin and end of the nested plan processing.
*/
@Override
public Result getNextTuple() throws ExecException {
try {
Result res = null;
Result inp = null;
//The nested plan is under processing
//So return tuples that the generate oper
//returns
if(processingPlan){
while(true) {
res = processPlan();
if(res.returnStatus==POStatus.STATUS_OK) {
return res;
}
if(res.returnStatus==POStatus.STATUS_EOP) {
processingPlan = false;
for(PhysicalPlan plan : inputPlans) {
plan.detachInput();
}
break;
}
if(res.returnStatus==POStatus.STATUS_ERR) {
return res;
}
if(res.returnStatus==POStatus.STATUS_NULL) {
continue;
}
}
}
//The nested plan processing is done or is
//yet to begin. So process the input and start
//nested plan processing on the input tuple
//read
while (true) {
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_ERR) {
return inp;
}
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
if (inp.returnStatus == POStatus.STATUS_EOP) {
if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) {
// continue pull one more output
inp = UNLIMITED_NULL_RESULT;
} else {
return inp;
}
}
attachInputToPlans((Tuple) inp.result);
inpTuple = (Tuple)inp.result;
for (PhysicalOperator po : opsToBeReset) {
po.reset();
}
if (isAccumulative()) {
for(int i=0; i<inpTuple.size(); i++) {
if (inpTuple.getType(i) == DataType.BAG) {
// we only need to check one bag, because all the bags
// share the same buffer
buffer = ((AccumulativeBag)inpTuple.get(i)).getTuplebuffer();
break;
}
}
setAccumStart();
while(true) {
if (!isEarlyTerminated() && buffer.hasNextBatch()) {
try {
buffer.nextBatch();
}catch(IOException e) {
throw new ExecException(e);
}
}else{
if (buffer instanceof POPackage.POPackageTupleBuffer) {
inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
}
setAccumEnd();
}
res = processPlan();
if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
// attach same input again to process next batch
attachInputToPlans((Tuple) inp.result);
} else if (res.returnStatus == POStatus.STATUS_EARLY_TERMINATION) {
//if this bubbled up, then we just need to pass a null value through the pipe
//so that POUserFunc will properly return the values
attachInputToPlans(null);
earlyTerminate();
} else {
break;
}
}
buffer.clear();
} else {
res = processPlan();
}
processingPlan = true;
return res;
}
} catch (RuntimeException e) {
throw new ExecException("Error while executing ForEach at " + this.getOriginalLocations(), e);
}
}
private boolean isEarlyTerminated = false;
private TupleMaker<? extends Tuple> tupleMaker;
private boolean knownSize = false;
private boolean isEarlyTerminated() {
return isEarlyTerminated;
}
private void earlyTerminate() {
isEarlyTerminated = true;
}
protected Result processPlan() throws ExecException{
if (schema != null && tupleMaker == null) {
// Note here that if SchemaTuple is currently turned on, then any UDF's in the chain
// must follow good practices. Namely, they should not append to the Tuple that comes
// out of an iterator (a practice which is fairly common, but is not recommended).
tupleMaker = SchemaTupleFactory.getInstance(schema, false, GenContext.FOREACH);
if (tupleMaker != null) {
knownSize = true;
}
}
if (tupleMaker == null) {
tupleMaker = TupleFactory.getInstance();
}
Result res = new Result();
//We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
if(its != null) {
boolean restartIts = true;
for(int i = 0; i < noItems; ++i) {
if(its[i] != null && isToBeFlattenedArray[i] == true) {
restartIts &= !its[i].hasNext();
}
}
//this means that all the databags have reached their last elements. so we need to force reading of fresh databags
if(restartIts) {
its = null;
data = null;
}
}
if(its == null) {
if (endOfAllInputProcessed) {
return RESULT_EOP;
}
//getNext being called for the first time OR starting with a set of new data from inputs
its = new Iterator[noItems];
bags = new Object[noItems];
earlyTermination = new BitSet(noItems);
for(int i = 0; i < noItems; ++i) {
//Getting the iterators
//populate the input data
Result inputData = null;
switch(resultTypes[i]) {
case DataType.BAG:
case DataType.TUPLE :
case DataType.BYTEARRAY :
case DataType.MAP :
case DataType.BOOLEAN :
case DataType.INTEGER :
case DataType.DOUBLE :
case DataType.LONG :
case DataType.FLOAT :
case DataType.BIGINTEGER :
case DataType.BIGDECIMAL :
case DataType.DATETIME :
case DataType.CHARARRAY :
inputData = planLeafOps[i].getNext(resultTypes[i]);
break;
default: {
int errCode = 2080;
String msg = "Foreach currently does not handle type " + DataType.findTypeName(resultTypes[i]);
throw new ExecException(msg, errCode, PigException.BUG);
}
}
//we accrue information about what accumulators have early terminated
//in the case that they all do, we can finish
if (inputData.returnStatus == POStatus.STATUS_EARLY_TERMINATION) {
if (!earlyTermination.get(i))
earlyTermination.set(i);
continue;
}
if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) {
continue;
}
if(inputData.returnStatus == POStatus.STATUS_EOP) {
//we are done with all the elements. Time to return.
its = null;
bags = null;
return inputData;
}
// if we see a error just return it
if(inputData.returnStatus == POStatus.STATUS_ERR) {
return inputData;
}
// Object input = null;
bags[i] = inputData.result;
if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
its[i] = ((DataBag)bags[i]).iterator();
} else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
its[i] = ((Map)bags[i]).entrySet().iterator();
} else {
// This includes FLATTEN(null) for bag and map
// in addition to non-null values from other data types
its[i] = null;
}
}
if (parentPlan!=null && parentPlan.endOfAllInput && endOfAllInputProcessing) {
endOfAllInputProcessed = true;
}
}
// if accumulating, we haven't got data yet for some fields, just return
if (isAccumulative() && isAccumStarted()) {
if (earlyTermination.cardinality() < noItems) {
res.returnStatus = POStatus.STATUS_BATCH_OK;
} else {
res.returnStatus = POStatus.STATUS_EARLY_TERMINATION;
}
return res;
}
while(true) {
if(data == null) {
//getNext being called for the first time or starting on new input data
//we instantiate the template array and start populating it with data
data = new Object[noItems];
for(int i = 0; i < noItems; ++i) {
if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) {
if(its[i].hasNext()) {
data[i] = its[i].next();
} else {
//the input set is null, so we return. This is
// caught above and this function recalled with
// new inputs.
its = null;
data = null;
res.returnStatus = POStatus.STATUS_NULL;
return res;
}
} else {
data[i] = bags[i];
}
}
if(getReporter()!=null) {
getReporter().progress();
}
res.result = createTuple(data);
res.returnStatus = POStatus.STATUS_OK;
return res;
} else {
//we try to find the last expression which needs flattening and start iterating over it
//we also try to update the template array
for(int index = noItems - 1; index >= 0; --index) {
if(its[index] != null && isToBeFlattenedArray[index]) {
if(its[index].hasNext()) {
data[index] = its[index].next();
res.result = createTuple(data);
res.returnStatus = POStatus.STATUS_OK;
return res;
}
else{
// reset this index's iterator so cross product can be achieved
// we would be resetting this way only for the indexes from the end
// when the first index which needs to be flattened has reached the
// last element in its iterator, we won't come here - instead, we reset
// all iterators at the beginning of this method.
if(bags[index] instanceof DataBag) {
its[index] = ((DataBag)bags[index]).iterator();
} else { // if (bags[i] instanceof Map)) {
its[index] = ((Map)bags[index]).entrySet().iterator();
}
data[index] = its[index].next();
}
}
}
}
}
//return null;
}
/**
*
* @param data array that is the template for the final flattened tuple
* @return the final flattened tuple
*/
protected Tuple createTuple(Object[] data) throws ExecException {
Tuple out = tupleMaker.newTuple();
int idx = 0;
for(int i = 0; i < data.length; ++i) {
Object in = data[i];
if(!isToBeFlattenedArray[i]) {
if (knownSize) {
out.set(idx++, in);
} else {
out.append(in);
}
} else if(in instanceof Tuple) {
Tuple t = (Tuple)in;
int size = t.size();
for(int j = 0; j < size; ++j) {
if (knownSize) {
out.set(idx++, t.get(j));
} else {
out.append(t.get(j));
}
}
} else if (in instanceof Map.Entry) {
Map.Entry entry = (Map.Entry)in;
if (knownSize) {
out.set(idx++, entry.getKey());
out.set(idx++, entry.getValue());
} else {
out.append(entry.getKey());
out.append(entry.getValue());
}
} else if (in == null &&
flattenNumFields != null && flattenNumFields[i] != 0 ) {
// Handling of FLATTEN(null) here.
// Expanding to multiple nulls depending on the schema
for( int j = 0; j < flattenNumFields[i]; j++ ) {
if (knownSize) {
out.set(idx++, null);
} else {
out.append(null);
}
}
} else {
if (knownSize) {
out.set(idx++, in);
} else {
out.append(in);
}
}
}
if (inpTuple != null) {
return illustratorMarkup(inpTuple, out, 0);
} else {
return illustratorMarkup2(data, out);
}
}
protected void attachInputToPlans(Tuple t) {
//super.attachInput(t);
for(PhysicalPlan p : inputPlans) {
p.attachInput(t);
}
}
public void getLeaves() {
if (inputPlans != null) {
int i=-1;
if(isToBeFlattenedArray == null) {
isToBeFlattenedArray = new boolean[inputPlans.size()];
}
planLeafOps = new PhysicalOperator[inputPlans.size()];
for(PhysicalPlan p : inputPlans) {
++i;
PhysicalOperator leaf = p.getLeaves().get(0);
planLeafOps[i] = leaf;
if(leaf instanceof POProject &&
leaf.getResultType() == DataType.TUPLE &&
((POProject)leaf).isProjectToEnd() ) {
isToBeFlattenedArray[i] = true;
}
}
}
// we are calculating plan leaves
// so lets reinitialize
reInitialize();
}
private void reInitialize() {
if(planLeafOps != null) {
noItems = planLeafOps.length;
resultTypes = new byte[noItems];
for (int i = 0; i < resultTypes.length; i++) {
resultTypes[i] = planLeafOps[i].getResultType();
}
} else {
noItems = 0;
resultTypes = null;
}
if(inputPlans != null) {
for (PhysicalPlan pp : inputPlans) {
try {
ResetFinder lf = new ResetFinder(pp, opsToBeReset);
lf.visit();
} catch (VisitorException ve) {
String errMsg = "Internal Error: Unexpected error looking for nested operators which need to be reset in FOREACH";
throw new RuntimeException(errMsg, ve);
}
}
}
}
public List<PhysicalPlan> getInputPlans() {
return inputPlans;
}
public void setInputPlans(List<PhysicalPlan> plans) {
inputPlans = plans;
planLeafOps = null;
getLeaves();
}
public void addInputPlan(PhysicalPlan plan, boolean flatten) {
inputPlans.add(plan);
// add to planLeafOps
// copy existing leaves
PhysicalOperator[] newPlanLeafOps = new PhysicalOperator[planLeafOps.length + 1];
for (int i = 0; i < planLeafOps.length; i++) {
newPlanLeafOps[i] = planLeafOps[i];
}
// add to the end
newPlanLeafOps[planLeafOps.length] = plan.getLeaves().get(0);
planLeafOps = newPlanLeafOps;
// add to isToBeFlattenedArray
// copy existing values
boolean[] newIsToBeFlattenedArray = new boolean[isToBeFlattenedArray.length + 1];
for(int i = 0; i < isToBeFlattenedArray.length; i++) {
newIsToBeFlattenedArray[i] = isToBeFlattenedArray[i];
}
// add to end
newIsToBeFlattenedArray[isToBeFlattenedArray.length] = flatten;
isToBeFlattenedArray = newIsToBeFlattenedArray;
// we just added a leaf - reinitialize
reInitialize();
}
public void setToBeFlattened(List<Boolean> flattens) {
setUpFlattens(flattens);
}
public List<Boolean> getToBeFlattened() {
List<Boolean> result = null;
if(isToBeFlattenedArray != null) {
result = new ArrayList<Boolean>();
for (int i = 0; i < isToBeFlattenedArray.length; i++) {
result.add(isToBeFlattenedArray[i]);
}
}
return result;
}
/**
* Make a deep copy of this operator.
* @throws CloneNotSupportedException
*/
@Override
public POForEach clone() throws CloneNotSupportedException {
List<PhysicalPlan> plans = new
ArrayList<PhysicalPlan>(inputPlans.size());
for (PhysicalPlan plan : inputPlans) {
plans.add(plan.clone());
}
List<Boolean> flattens = null;
if(isToBeFlattenedArray != null) {
flattens = new
ArrayList<Boolean>(isToBeFlattenedArray.length);
for (boolean b : isToBeFlattenedArray) {
flattens.add(b);
}
}
POForEach clone = new POForEach(new OperatorKey(mKey.scope,
NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
requestedParallelism, plans, flattens);
clone.setResultType(getResultType());
clone.addOriginalLocation(alias, getOriginalLocations());
clone.endOfAllInputProcessing = endOfAllInputProcessing;
clone.mapSideOnly = mapSideOnly;
clone.flattenNumFields = flattenNumFields;
return clone;
}
public boolean inProcessing()
{
return processingPlan;
}
public void setFlattenNumFields (int [] flattenNumFields) {
this.flattenNumFields = flattenNumFields;
}
protected void setUpFlattens(List<Boolean> isToBeFlattened) {
if(isToBeFlattened == null) {
isToBeFlattenedArray = null;
} else {
isToBeFlattenedArray = new boolean[isToBeFlattened.size()];
int i = 0;
for (Iterator<Boolean> it = isToBeFlattened.iterator(); it.hasNext();) {
isToBeFlattenedArray[i++] = it.next();
}
}
}
/**
* Visits a pipeline and calls reset on all the nodes. Currently only
* pays attention to limit nodes, each of which need to be told to reset
* their limit.
*/
private class ResetFinder extends PhyPlanVisitor {
ResetFinder(PhysicalPlan plan, List<PhysicalOperator> toBeReset) {
super(plan,
new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
}
@Override
public void visitDistinct(PODistinct d) throws VisitorException {
// FIXME: add only if limit is present
opsToBeReset.add(d);
}
@Override
public void visitLimit(POLimit limit) throws VisitorException {
opsToBeReset.add(limit);
}
@Override
public void visitSort(POSort sort) throws VisitorException {
// FIXME: add only if limit is present
opsToBeReset.add(sort);
}
@Override
public void visitCross(POCross c) throws VisitorException {
// FIXME: add only if limit is present
opsToBeReset.add(c);
}
@Override
public void visitProject(POProject proj) throws VisitorException {
if(proj instanceof PORelationToExprProject) {
opsToBeReset.add(proj);
}
}
}
/**
* @return the opsToBeReset
*/
public List<PhysicalOperator> getOpsToBeReset() {
return opsToBeReset;
}
/**
* @param opsToBeReset the opsToBeReset to set
*/
public void setOpsToBeReset(List<PhysicalOperator> opsToBeReset) {
this.opsToBeReset = opsToBeReset;
}
private Tuple illustratorMarkup2(Object[] in, Object out) {
if(illustrator != null) {
ExampleTuple tOut = new ExampleTuple((Tuple) out);
illustrator.getLineage().insert(tOut);
boolean synthetic = false;
for (Object tIn : in)
{
synthetic |= ((ExampleTuple) tIn).synthetic;
illustrator.getLineage().union(tOut, (Tuple) tIn);
}
illustrator.addData(tOut);
int i;
for (i = 0; i < noItems; ++i) {
if (((DataBag)bags[i]).size() < 2) {
break;
}
}
if (i >= noItems && !illustrator.getEqClassesShared()) {
illustrator.getEquivalenceClasses().get(0).add(tOut);
}
tOut.synthetic = synthetic;
return tOut;
} else {
return (Tuple) out;
}
}
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if(illustrator != null) {
ExampleTuple tOut = new ExampleTuple((Tuple) out);
illustrator.addData(tOut);
if (!illustrator.getEqClassesShared()) {
illustrator.getEquivalenceClasses().get(0).add(tOut);
}
LineageTracer lineageTracer = illustrator.getLineage();
lineageTracer.insert(tOut);
tOut.synthetic = ((ExampleTuple) in).synthetic;
lineageTracer.union((ExampleTuple) in , tOut);
return tOut;
} else {
return (Tuple) out;
}
}
public PhysicalOperator[] getPlanLeafOps() {
return planLeafOps;
}
public void setMapSideOnly(boolean mapSideOnly) {
this.mapSideOnly = mapSideOnly;
}
public boolean isMapSideOnly() {
return mapSideOnly;
}
public boolean needEndOfAllInputProcessing() throws ExecException {
try {
for (PhysicalPlan innerPlan : inputPlans) {
UDFEndOfAllInputNeededVisitor endOfAllInputNeededVisitor
= new UDFEndOfAllInputNeededVisitor(innerPlan);
endOfAllInputNeededVisitor.visit();
if (endOfAllInputNeededVisitor.needEndOfAllInputProcessing()) {
endOfAllInputProcessing = true;
return true;
}
}
return false;
} catch (Exception e) {
throw new ExecException(e);
}
}
}