| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.apache.pig.PigException; |
| import org.apache.pig.PigWarning; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.data.SingleTupleBag; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.plan.NodeIdGenerator; |
| import org.apache.pig.impl.plan.OperatorKey; |
| import org.apache.pig.impl.plan.VisitorException; |
| |
| /** |
| * Implements the overloaded form of the project operator. |
| * Projects the specified column from the input tuple. |
| * However, if asked for tuples when the input is a bag, |
| * the overloaded form is invoked and the project streams |
| * the tuples through instead of the bag. |
| */ |
| public class POProject extends ExpressionOperator { |
| |
| /** |
| * |
| */ |
| private static final long serialVersionUID = 1L; |
| |
| private boolean resultSingleTupleBag = false; |
| |
| //The column to project |
| protected ArrayList<Integer> columns; |
| |
| //True if we are in the middle of streaming tuples |
| //in a bag |
| private boolean processingBagOfTuples = false; |
| |
| //The bag iterator used while straeming tuple |
| private transient Iterator<Tuple> bagIterator = null; |
| |
| //Represents the fact that this instance of POProject |
| //is overloaded to stream tuples in the bag rather |
| //than passing the entire bag. It is the responsibility |
| //of the translator to set this. |
| protected boolean overloaded = false; |
| |
| |
| protected boolean isProjectToEnd = false; |
| protected int startCol; |
| |
| public POProject(OperatorKey k) { |
| this(k,-1,0); |
| } |
| |
| public POProject(OperatorKey k, int rp) { |
| this(k, rp, 0); |
| } |
| |
| public POProject(OperatorKey k, int rp, int col) { |
| super(k, rp); |
| columns = new ArrayList<Integer>(); |
| columns.add(col); |
| } |
| |
| public POProject(OperatorKey k, int rp, ArrayList<Integer> cols) { |
| super(k, rp); |
| columns = cols; |
| } |
| |
| public void setProjectToEnd(int startCol){ |
| this.isProjectToEnd = true; |
| this.startCol = startCol; |
| columns = new ArrayList<Integer>(); |
| } |
| |
| @Override |
| public String name() { |
| |
| String str = "Project" + "[" + DataType.findTypeName(resultType) + "]"; |
| |
| if(isStar()){ |
| str += "[*]"; |
| }else if(isProjectToEnd){ |
| str += "[" + startCol + " .. " + "]"; |
| }else{ |
| str += columns; |
| } |
| |
| str += " - " + mKey.toString(); |
| return str; |
| |
| } |
| |
| @Override |
| public boolean supportsMultipleInputs() { |
| return false; |
| } |
| |
| @Override |
| public boolean supportsMultipleOutputs() { |
| return false; |
| } |
| |
| @Override |
| public void visit(PhyPlanVisitor v) throws VisitorException { |
| v.visitProject(this); |
| } |
| |
| |
| /** |
| * Overridden since the attachment of the new input |
| * should cause the old processing to end. |
| */ |
| @Override |
| public void attachInput(Tuple t) { |
| super.attachInput(t); |
| processingBagOfTuples = false; |
| } |
| |
| /** |
| * Fetches the input tuple and returns the requested |
| * column |
| * @return next value. |
| * @throws ExecException |
| */ |
| public Result getNext() throws ExecException{ |
| Result res = processInput(); |
| Tuple inpValue = (Tuple)res.result; |
| |
| Object ret; |
| if(res.returnStatus != POStatus.STATUS_OK){ |
| return res; |
| } |
| if (isStar()) { |
| illustratorMarkup(inpValue, res.result, -1); |
| return res; |
| } else if(columns.size() == 1) { |
| if ( inpValue == null ) { |
| // the tuple is null, so a dereference should also produce a null |
| res.returnStatus = POStatus.STATUS_OK; |
| ret = null; |
| } else if( inpValue.size() > columns.get(0) ) { |
| ret = inpValue.get(columns.get(0)); |
| } else { |
| if(pigLogger != null) { |
| pigLogger.warn(this,"Attempt to access field " + |
| "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); |
| } |
| res.returnStatus = POStatus.STATUS_OK; |
| ret = null; |
| } |
| } else if(isProjectToEnd){ |
| ret = getRangeTuple(inpValue); |
| } |
| else { |
| ArrayList<Object> objList = new ArrayList<Object>(columns.size()); |
| |
| for(int col : columns) { |
| addColumn(objList, inpValue, col); |
| } |
| ret = mTupleFactory.newTupleNoCopy(objList); |
| } |
| res.result = ret; |
| illustratorMarkup(inpValue, res.result, -1); |
| return res; |
| } |
| |
| private boolean isRangeInvalid(int lastColIdx) { |
| if(startCol > lastColIdx){ |
| // this must be happening because tuple is smaller than startCol |
| if(pigLogger != null) { |
| pigLogger.warn(this, "Invalid range being projected," + |
| " startCol postition" + startCol + " greater than tuple size", |
| PigWarning.PROJECTION_INVALID_RANGE |
| ); |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Add i'th column from inpValue to objList |
| * @param objList |
| * @param inpValue |
| * @param i |
| * @throws ExecException |
| */ |
| private void addColumn(ArrayList<Object> objList, Tuple inpValue, int i) |
| throws ExecException { |
| if( inpValue == null ) { |
| // the tuple is null, so a dereference should also produce a null |
| objList.add(null); |
| } else if( inpValue.size() > i ) { |
| objList.add(inpValue.get(i)); |
| } else { |
| if(pigLogger != null) { |
| pigLogger.warn(this,"Attempt to access field " + i + |
| " which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); |
| } |
| objList.add(null); |
| } |
| } |
| |
| @Override |
| public Result getNextDataBag() throws ExecException { |
| |
| Result res = processInputBag(); |
| if(res.returnStatus!=POStatus.STATUS_OK) |
| return res; |
| return(consumeInputBag(res)); |
| } |
| |
| /** |
| * @param input |
| * @throws ExecException |
| */ |
| protected Result consumeInputBag(Result input) throws ExecException { |
| if(isInputAttached() || isStar()){ |
| Result retVal = new Result(); |
| retVal.result = input.result; |
| retVal.returnStatus = POStatus.STATUS_OK; |
| detachInput(); |
| return retVal; |
| } |
| |
| if (input.result instanceof DataBag) { |
| DataBag inpBag = (DataBag) input.result; |
| Result retVal = new Result(); |
| |
| DataBag outBag; |
| if(resultSingleTupleBag) { |
| // we have only one tuple in a bag - so create |
| // A SingleTupleBag for the result and fill it |
| // appropriately from the input bag |
| Tuple tuple = inpBag.iterator().next(); |
| if(!isProjectToEnd){ |
| ArrayList<Object> objList = new ArrayList<Object>(columns.size()); |
| for (int col : columns) { |
| addColumn(objList, tuple, col); |
| } |
| outBag = new SingleTupleBag( mTupleFactory.newTupleNoCopy(objList) ); |
| }else { |
| Tuple tmpTuple = getRangeTuple(tuple); |
| outBag = new SingleTupleBag(tmpTuple); |
| } |
| } else { |
| outBag = mBagFactory.newDefaultBag(); |
| for (Tuple tuple : inpBag) { |
| if(!isProjectToEnd){ |
| ArrayList<Object> objList = new ArrayList<Object>(columns.size()); |
| for (int col : columns) { |
| addColumn(objList, tuple, col); |
| } |
| outBag.add( mTupleFactory.newTupleNoCopy(objList) ); |
| }else{ |
| Tuple outTuple = getRangeTuple(tuple); |
| outBag.add(outTuple); |
| } |
| } |
| } |
| retVal.result = outBag; |
| retVal.returnStatus = POStatus.STATUS_OK; |
| return retVal; |
| } else if (input.result instanceof Tuple) { |
| // if input is tuple, columns should only have one item |
| Result retVal = new Result(); |
| retVal.result = ((Tuple)input.result).get(columns.get(0)); |
| retVal.returnStatus = POStatus.STATUS_OK; |
| return retVal; |
| } else if (input.result==null) { |
| Result retVal = new Result(); |
| retVal.result = null; |
| retVal.returnStatus = POStatus.STATUS_OK; |
| return retVal; |
| } else { |
| throw new ExecException("Cannot dereference a bag from " + input.result.getClass().getName(), 1129); |
| } |
| } |
| |
| private Tuple getRangeTuple(Tuple tuple) throws ExecException { |
| int lastColIdx = tuple.size() - 1; |
| Tuple outTuple; |
| if(isRangeInvalid(lastColIdx)){ |
| //invalid range - return empty tuple |
| outTuple = mTupleFactory.newTuple(); |
| } |
| else { |
| ArrayList<Object> objList = new ArrayList<Object>(lastColIdx - startCol + 1); |
| for(int i = startCol; i <= lastColIdx ; i++){ |
| addColumn(objList, tuple, i); |
| } |
| outTuple = mTupleFactory.newTupleNoCopy(objList); |
| } |
| return outTuple; |
| } |
| |
| @Override |
| public Result getNextDataByteArray() throws ExecException { |
| return getNext(); |
| } |
| |
| @Override |
| public Result getNextDouble() throws ExecException { |
| return getNext(); |
| } |
| |
| @Override |
| public Result getNextFloat() throws ExecException { |
| return getNext(); |
| } |
| |
| @Override |
| public Result getNextInteger() throws ExecException { |
| return getNext(); |
| } |
| |
| @Override |
| public Result getNextLong() throws ExecException { |
| return getNext(); |
| } |
| |
| |
| |
| @Override |
| public Result getNextBoolean() throws ExecException { |
| return getNext(); |
| } |
| |
| @Override |
| public Result getNextDateTime() throws ExecException { |
| return getNext(); |
| } |
| |
| @Override |
| public Result getNextMap() throws ExecException { |
| return getNext(); |
| } |
| |
| @Override |
| public Result getNextString() throws ExecException { |
| return getNext(); |
| } |
| |
| @Override |
| public Result getNextBigInteger() throws ExecException { |
| return getNext(); |
| } |
| |
| @Override |
| public Result getNextBigDecimal() throws ExecException { |
| return getNext(); |
| } |
| |
| /** |
| * Asked for Tuples. Check if the input is a bag. |
| * If so, stream the tuples in the bag instead of |
| * the entire bag. |
| */ |
| @Override |
| public Result getNextTuple() throws ExecException { |
| Result res = new Result(); |
| if(!processingBagOfTuples){ |
| Tuple inpValue = null; |
| res = processInput(); |
| if(res.returnStatus!=POStatus.STATUS_OK) |
| return res; |
| if(isStar()) |
| return res; |
| |
| inpValue = (Tuple)res.result; |
| res.result = null; |
| |
| Object ret; |
| |
| if(columns.size() == 1) { |
| if( inpValue == null ) { |
| // the tuple is null, so a dereference should also produce a null |
| ret = null; |
| } else if( inpValue.size() > columns.get(0) ) { |
| ret = inpValue.get(columns.get(0)); |
| } else { |
| if(pigLogger != null) { |
| pigLogger.warn(this,"Attempt to access field " + |
| "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); |
| } |
| ret = null; |
| } |
| } else if(isProjectToEnd) { |
| ret = getRangeTuple(inpValue); |
| } else { |
| ArrayList<Object> objList = new ArrayList<Object>(columns.size()); |
| |
| for(int col: columns) { |
| if( inpValue == null ) { |
| // the tuple is null, so a dereference should also produce a null |
| objList.add(null); |
| } else if( inpValue.size() > col ) { |
| objList.add(inpValue.get(col)); |
| } else { |
| if(pigLogger != null) { |
| pigLogger.warn(this,"Attempt to access field " + |
| "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); |
| } |
| objList.add(null); |
| } |
| } |
| ret = mTupleFactory.newTuple(objList); |
| res.result = (Tuple)ret; |
| return res; |
| } |
| |
| if(overloaded){ |
| if (ret!=null) { |
| DataBag retBag = (DataBag)ret; |
| bagIterator = retBag.iterator(); |
| if(bagIterator.hasNext()){ |
| processingBagOfTuples = true; |
| res.result = bagIterator.next(); |
| } |
| // If the bag contains no tuple, set the returnStatus to STATUS_EOP |
| if (!processingBagOfTuples) |
| res.returnStatus = POStatus.STATUS_EOP; |
| } else { |
| res.returnStatus = POStatus.STATUS_EOP; |
| } |
| } |
| else { |
| res.result = (Tuple)ret; |
| } |
| return res; |
| } |
| if(bagIterator.hasNext()){ |
| res.result = bagIterator.next(); |
| res.returnStatus = POStatus.STATUS_OK; |
| return res; |
| } |
| else{ |
| //done processing the bag of tuples |
| processingBagOfTuples = false; |
| return getNextTuple(); |
| } |
| } |
| |
| public ArrayList<Integer> getColumns(){ |
| if(isProjectToEnd) { |
| throw new AssertionError("Internal error. Improper use of method getColumns() in " |
| + POProject.class.getSimpleName()); |
| } |
| return columns; |
| } |
| |
| public int getColumn() throws ExecException { |
| if(columns.size() != 1 || isProjectToEnd) { |
| int errCode = 2068; |
| String msg = "Internal error. Improper use of method getColumn() in " |
| + POProject.class.getSimpleName(); |
| throw new ExecException(msg, errCode, PigException.BUG); |
| } |
| return columns.get(0); |
| } |
| |
| public int getStartCol(){ |
| return startCol; |
| } |
| |
| public void setColumns(ArrayList<Integer> cols) { |
| if(isProjectToEnd){ |
| throw new AssertionError("Columns should not be set for range projection"); |
| } |
| this.columns = cols; |
| } |
| |
| public void setColumn(int col) { |
| isProjectToEnd = false; |
| if(null == columns) { |
| columns = new ArrayList<Integer>(); |
| } else { |
| columns.clear(); |
| } |
| columns.add(col); |
| } |
| |
| public boolean isOverloaded() { |
| return overloaded; |
| } |
| |
| public void setOverloaded(boolean overloaded) { |
| this.overloaded = overloaded; |
| } |
| |
| public boolean isStar() { |
| return isProjectToEnd && startCol == 0; |
| } |
| |
| public boolean isProjectToEnd(){ |
| return isProjectToEnd; |
| } |
| |
| public void setStar(boolean star) { |
| if(star){ |
| isProjectToEnd = true; |
| startCol = 0; |
| }else{ |
| isProjectToEnd = false; |
| } |
| } |
| |
| @Override |
| public POProject clone() throws CloneNotSupportedException { |
| ArrayList<Integer> cols = new ArrayList<Integer>(columns.size()); |
| // Can resuse the same Integer objects, as they are immutable |
| for (Integer i : columns) { |
| cols.add(i); |
| } |
| POProject clone = new POProject(new OperatorKey(mKey.scope, |
| NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)), |
| requestedParallelism, cols); |
| clone.cloneHelper(this); |
| clone.overloaded = overloaded; |
| clone.startCol = startCol; |
| clone.isProjectToEnd = isProjectToEnd; |
| clone.resultType = resultType; |
| return clone; |
| } |
| |
| protected Result processInputBag() throws ExecException { |
| |
| Result res = new Result(); |
| if (input==null && (inputs == null || inputs.size()==0)) { |
| // log.warn("No inputs found. Signaling End of Processing."); |
| res.returnStatus = POStatus.STATUS_EOP; |
| return res; |
| } |
| |
| //Should be removed once the model is clear |
| if(getReporter()!=null) { |
| getReporter().progress(); |
| } |
| |
| if(!isInputAttached()) { |
| if (inputs.get(0).getResultType()==DataType.BAG) |
| return inputs.get(0).getNextDataBag(); |
| else |
| return inputs.get(0).getNextTuple(); |
| } |
| else{ |
| res.result = (DataBag)input.get(columns.get(0)); |
| res.returnStatus = POStatus.STATUS_OK; |
| return res; |
| } |
| } |
| |
| public void setResultSingleTupleBag(boolean resultSingleTupleBag) { |
| this.resultSingleTupleBag = resultSingleTupleBag; |
| } |
| |
| @Override |
| public List<ExpressionOperator> getChildExpressions() { |
| return null; |
| } |
| |
| @Override |
| public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { |
| if(illustrator != null) { |
| |
| } |
| return null; |
| } |
| } |