blob: 8ca294f4697c2b1158ca4adee7c700ae11df7a2e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
* The MapReduce Split operator.
* <p>
* The assumption here is that
* the logical to physical translation
* will create this dummy operator with
* just the filename using which the input
* branch will be stored and used for loading
* Also the translation should make sure that
* appropriate filter operators are configured
* as outputs of this operator using the conditions
* specified in the LOSplit. So LOSplit will be converted
* into:
* | | |
* Filter1 Filter2 ... Filter3
* | | ... |
* | | ... |
* ---- POSplit -... ----
* This is different than the existing implementation
* where the POSplit writes to sidefiles after filtering
* and then loads the appropriate file.
* <p>
* The approach followed here is as good as the old
* approach if not better in many cases because
* of the availability of attachinInputs. An optimization
* that can ensue is if there are multiple loads that
* load the same file, they can be merged into one and
* then the operators that take input from the load
* can be stored. This can be used when
* the mapPlan executes to read the file only once and
* attach the resulting tuple as inputs to all the
* operators that take input from this load.
* In some cases where the conditions are exclusive and
* some outputs are ignored, this approach can be worse.
* But this leads to easier management of the Split and
* also allows to reuse this data stored from the split
* job whenever necessary.
public class POSplit extends PhysicalOperator {
private static final long serialVersionUID = 1L;
* The filespec that is used to store and load the output of the split job
* which is the job containing the split
private FileSpec splitStore;
* The list of sub-plans the inner plan is composed of
private List<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
private BitSet processedSet = new BitSet();
private transient boolean inpEOP = false;
* Constructs an operator with the specified key
* @param k the operator key
public POSplit(OperatorKey k) {
* Constructs an operator with the specified key
* and degree of parallelism
* @param k the operator key
* @param rp the degree of parallelism requested
public POSplit(OperatorKey k, int rp) {
* Constructs an operator with the specified key and inputs
* @param k the operator key
* @param inp the inputs that this operator will read data from
public POSplit(OperatorKey k, List<PhysicalOperator> inp) {
* Constructs an operator with the specified key,
* degree of parallelism and inputs
* @param k the operator key
* @param rp the degree of parallelism requested
* @param inp the inputs that this operator will read data from
public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k, rp, inp);
public void visit(PhyPlanVisitor v) throws VisitorException {
public String name() {
return getAliasString() + "Split - " + mKey.toString();
public boolean supportsMultipleInputs() {
return false;
public boolean supportsMultipleOutputs() {
return true;
* Returns the name of the file associated with this operator
* @return the FileSpec associated with this operator
public FileSpec getSplitStore() {
return splitStore;
* Sets the name of the file associated with this operator
* @param splitStore the FileSpec used to store the data
public void setSplitStore(FileSpec splitStore) {
this.splitStore = splitStore;
* Returns the list of nested plans.
* @return the list of the nested plans
* @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter
public List<PhysicalPlan> getPlans() {
return myPlans;
* Appends the specified plan to the end of
* the nested input plan list
* @param inPlan plan to be appended to the list
public void addPlan(PhysicalPlan inPlan) {
* Removes plan from
* the nested input plan list
* @param plan plan to be removed
public void removePlan(PhysicalPlan plan) {
public Result getNextTuple() throws ExecException {
if (this.parentPlan.endOfAllInput) {
return getStreamCloseResult();
if (processedSet.cardinality() == myPlans.size()) {
Result inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP && this.parentPlan.endOfAllInput) {
return getStreamCloseResult();
if (inp.returnStatus == POStatus.STATUS_EOP
|| inp.returnStatus == POStatus.STATUS_ERR ) {
return inp;
Tuple tuple = (Tuple)inp.result;
for (PhysicalPlan pl : myPlans) {
return processPlan();
private Result processPlan() throws ExecException {
int idx = processedSet.nextClearBit(0);
PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
Result res = runPipeline(leaf);
if (res.returnStatus == POStatus.STATUS_EOP) {
if (idx < myPlans.size()) {
res = processPlan();
return (res.returnStatus == POStatus.STATUS_OK ||
res.returnStatus == POStatus.STATUS_ERR ) ? res : RESULT_EMPTY;
private Result runPipeline(PhysicalOperator leaf) throws ExecException {
Result res = null;
while (true) {
res = leaf.getNextTuple();
if (res.returnStatus == POStatus.STATUS_OK) {
} else if (res.returnStatus == POStatus.STATUS_NULL) {
} else if (res.returnStatus == POStatus.STATUS_EOP) {
} else if (res.returnStatus == POStatus.STATUS_ERR) {
return res;
private Result getStreamCloseResult() throws ExecException {
Result res = null;
while (true) {
if (processedSet.cardinality() == myPlans.size()) {
Result inp = processInput();
if (inp.returnStatus == POStatus.STATUS_OK) {
Tuple tuple = (Tuple)inp.result;
for (PhysicalPlan pl : myPlans) {
inpEOP = false;
} else if (inp.returnStatus == POStatus.STATUS_EOP){
inpEOP = true;
} else if (inp.returnStatus == POStatus.STATUS_NULL) {
inpEOP = false;
} else if (inp.returnStatus == POStatus.STATUS_ERR) {
return inp;
int idx = processedSet.nextClearBit(0);
if (inpEOP ) {
myPlans.get(idx).endOfAllInput = true;
PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
res = leaf.getNextTuple();
if (res.returnStatus == POStatus.STATUS_EOP) {
if (idx < myPlans.size()) {
} else {
if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) {
} else {
return res;
public POSplit clone() throws CloneNotSupportedException {
POSplit opClone = (POSplit) super.clone();
opClone.processedSet = new BitSet();
opClone.myPlans = clonePlans(myPlans);
return opClone;
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
// no op
return null;