| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; |
| |
| import java.io.IOException; |
| import java.io.ObjectInputStream; |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.PriorityQueue; |
| import java.util.Properties; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.Mapper.Context; |
| import org.apache.pig.ExecType; |
| import org.apache.pig.FuncSpec; |
| import org.apache.pig.IndexableLoadFunc; |
| import org.apache.pig.LoadFunc; |
| import org.apache.pig.PigException; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.data.InternalCachedBag; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.PigImplConstants; |
| import org.apache.pig.impl.io.FileSpec; |
| import org.apache.pig.impl.plan.NodeIdGenerator; |
| import org.apache.pig.impl.plan.OperatorKey; |
| import org.apache.pig.impl.plan.VisitorException; |
| import org.apache.pig.impl.util.Pair; |
| |
| public class POMergeCogroup extends PhysicalOperator { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private transient List<LoadFunc> sideLoaders; |
| |
| private List<FuncSpec> sidFuncSpecs; |
| |
| private List<String> sideFileSpecs; |
| |
| // Local Rearranges corresponding to side Loader to extract relevant bits |
| // out of the tuple we get from side loaders. |
| private POLocalRearrange LRs[]; |
| |
| private transient boolean firstTime; |
| |
| private transient Comparable<Object> firstKeyOfNextSplit; |
| |
| // This is the count of all the relations involved in Cogroup. The Mapped |
| // relation is also included in the count. |
| private transient int relationCnt; |
| |
| private transient TupleFactory mTupleFactory; |
| |
| private String indexFileName; |
| |
| private FuncSpec idxFuncSpec; |
| |
| private transient DataBag[] outBags; |
| |
| private transient Tuple prevTopOfHeap; |
| |
| private List<String> loaderSignatures; |
| |
| private transient boolean createNewBags; |
| |
| // Heap contains tuples with exactly three fields, which is same as output |
| // of LR except for third field which contains full-fledged input tuple, |
| // as oppose to key-stripped tuple generated by LR. |
| private transient PriorityQueue<Tuple> heap; |
| |
| private transient boolean lastTime; |
| |
| // Need to know in each getNext() whether we generated valid output in last |
| // call or not. |
| private transient boolean workingOnNewKey; |
| |
| private byte endOfRecordMark = POStatus.STATUS_EOP; |
| |
| public POMergeCogroup(OperatorKey k,List<PhysicalOperator> inpPOs, |
| POLocalRearrange[] lrs, int parallel) { |
| |
| super(k,parallel,inpPOs); |
| this.LRs = lrs; |
| for(int i=0; i < lrs.length; i++) |
| LRs[i].setStripKeyFromValue(false); |
| } |
| |
| // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez. |
| // This is because: |
| // For MR, we send EOP at the end of every record |
| // For Tez, we only use a global EOP, so send NULL for end of record |
| public void setEndOfRecordMark(byte endOfRecordMark) { |
| this.endOfRecordMark = endOfRecordMark; |
| } |
| |
| //For Spark |
| private transient boolean endOfInput = false; |
| public boolean isEndOfInput() { |
| return endOfInput; |
| } |
| public void setEndOfInput (boolean isEndOfInput) { |
| endOfInput = isEndOfInput; |
| } |
| |
| @Override |
| public Result getNextTuple() throws ExecException { |
| |
| try{ |
| if(createNewBags){ |
| // This implies we just generated output tuple in last call. |
| // So, its time to create new bags. |
| for (int i=0; i < relationCnt; i++) |
| outBags[i] = new InternalCachedBag(relationCnt); |
| createNewBags = false; |
| } |
| |
| // Get the tuple from predecessor. |
| Result baseInp = processInput(); |
| Tuple rearranged; |
| |
| switch (baseInp.returnStatus) { |
| |
| case POStatus.STATUS_OK: |
| rearranged = applyLRon((Tuple)baseInp.result, 0); |
| break; |
| |
| case POStatus.STATUS_EOP: |
| if(!(this.parentPlan.endOfAllInput || isEndOfInput())) |
| return baseInp; |
| |
| if(lastTime) |
| return baseInp; |
| |
| // We got all the records from mapper but have not yet generated |
| // all the outputs that we need to, so we continue filling |
| // and draining the heap from side loaders. |
| while(!heap.isEmpty()){ |
| |
| Tuple topOfHeap = heap.peek(); |
| |
| if(needToBreak(topOfHeap, prevTopOfHeap)) |
| break; |
| |
| workingOnNewKey = false; |
| topOfHeap = heap.poll(); |
| byte relIdx = (Byte)topOfHeap.get(0); |
| prevTopOfHeap = topOfHeap; |
| outBags[relIdx].add((Tuple)topOfHeap.get(2)); |
| |
| if(relIdx == 0) // We got all the mapper has to offer. |
| continue; |
| |
| Tuple nxtTuple = sideLoaders.get(relIdx-1).getNext(); |
| if(nxtTuple == null) |
| continue; |
| |
| Tuple rearrangedTup = applyLRon(nxtTuple, relIdx); |
| Object key = rearrangedTup.get(1); |
| if(null == firstKeyOfNextSplit || null == key || firstKeyOfNextSplit.compareTo(key) > 0){ |
| heap.offer(rearrangedTup); |
| } |
| } |
| |
| // This is the last output we will produce, if heap is empty |
| // or we got ahead of the first key of the next split. |
| if(heap.isEmpty() || (firstKeyOfNextSplit != null && firstKeyOfNextSplit.compareTo(heap.peek().get(1)) <= 0)) |
| lastTime = true; |
| |
| return getOutputTuple(); |
| |
| default: // In case of errors, we return the tuple as it is. |
| return baseInp; |
| } |
| |
| // Every time we read something valid from mapper, we add it to heap. |
| heap.offer(rearranged); |
| |
| if(firstTime){ |
| setup(rearranged); |
| firstTime = false; |
| // Heap is initialized with first record from all the relations. |
| } |
| |
| // Algorithm is as following: |
| |
| /* while (there are records in heap) { |
| peek at top record from heap |
| if we see a key change from previous run, break to return output. |
| else, |
| pull tuple from top of heap and place it in bag based on which input it came from |
| pull record from input that last record from top of heap came from |
| if (key pulled < first key in next split) insert into heap |
| } |
| output final record |
| */ |
| |
| while(!heap.isEmpty()){ |
| |
| Tuple topOfHeap = heap.peek(); |
| |
| if(needToBreak(topOfHeap, prevTopOfHeap)) |
| break; |
| |
| workingOnNewKey = false; |
| topOfHeap = heap.poll(); |
| byte relIdx = (Byte)topOfHeap.get(0); |
| prevTopOfHeap = topOfHeap; |
| outBags[relIdx].add((Tuple)topOfHeap.get(2)); |
| |
| // Pull tuple from the corresponding loader. |
| if(relIdx == 0){ |
| Tuple tuple = heap.peek(); |
| /* At this point, its possible to return the output tuple. |
| * So, lets check if we can do so. |
| * Remember, that for tuples having equal keys, tuple from |
| * leftmost relation is considered largest. So, if the tuple |
| * we just peeked from heap is of leftmost relation and has |
| * a different key then the tuple we polled last, then we can be |
| * assured that we have seen all the tuples belonging to this |
| * key and thus we can generate output tuple. |
| * Else, just pull the next tuple from the left relation by |
| * returning EOP. |
| */ |
| // First check if there is a tuple at top of heap and is from |
| // left relation. |
| if( (null != tuple) && (Byte)tuple.get(0) == 0){ |
| // Is key null for both tuples. |
| if(prevTopOfHeap.get(1) == null && tuple.get(1) == null){ |
| return new Result(endOfRecordMark,null); |
| } |
| // Does key change from non-null to null or from one |
| // non-null to another non-null. |
| if((prevTopOfHeap.get(1) == null && tuple.get(1) != null) || !tuple.get(1).equals(prevTopOfHeap.get(1))){ |
| return getOutputTuple(); |
| } |
| } |
| // Either top of heap is from different relation or it is |
| // from left relation but having the same key. |
| return new Result(endOfRecordMark,null); |
| } |
| |
| Tuple nxtTuple = sideLoaders.get(relIdx-1).getNext(); |
| if(nxtTuple == null) // EOF for this relation |
| continue; |
| |
| Tuple rearrangedTup = applyLRon(nxtTuple, relIdx); |
| Object key = rearrangedTup.get(1); |
| |
| if(firstKeyOfNextSplit == null || null == key || firstKeyOfNextSplit.compareTo(key) > 0){ |
| heap.offer(rearrangedTup); |
| } |
| } |
| |
| return getOutputTuple(); |
| } |
| |
| catch(IOException ioe){ |
| throw new ExecException(ioe); |
| } |
| } |
| |
| private Result getOutputTuple() throws ExecException{ |
| |
| workingOnNewKey = true; |
| createNewBags = true; |
| |
| Tuple out = mTupleFactory.newTuple(relationCnt+1); |
| |
| out.set(0, prevTopOfHeap.get(1)); |
| for(int i=0; i < relationCnt; i++) |
| out.set(i+1,(outBags[i])); |
| |
| return new Result(POStatus.STATUS_OK, out); |
| } |
| |
| |
| private boolean needToBreak(final Tuple curTopOfHeap, final Tuple prevTopOfHeap) throws ExecException{ |
| |
| if(workingOnNewKey) |
| // This implies we just returned an output tuple in previous call |
| // so we are now working on a fresh key. |
| return false; |
| |
| Object curKey = curTopOfHeap.get(1); |
| Object prevKey = prevTopOfHeap.get(1); |
| |
| if (curKey == null && null == prevKey) |
| // If both keys are nulls, check if they are coming from same relation. |
| // because nulls from different relations are not considered equal. |
| return ! ((Byte)curTopOfHeap.get(0)).equals((Byte)prevTopOfHeap.get(0)); |
| |
| |
| if(curKey == null || null == prevKey) |
| // If only one of them is null, then key has changed from null to non-null. |
| return true; |
| |
| // Both the keys are non-null. |
| return ! curKey.equals(prevKey); |
| } |
| |
| |
| @SuppressWarnings( "unchecked") |
| private void setup(Tuple firstRearrangedTup) throws IOException{ |
| |
| // Read our own split Index. |
| int curSplitIdx = 0; |
| if (PigMapReduce.sJobContext.getConfiguration().get(PigImplConstants.PIG_SPLIT_INDEX)!=null) { |
| curSplitIdx = Integer.parseInt(PigMapReduce.sJobContext.getConfiguration().get(PigImplConstants.PIG_SPLIT_INDEX)); |
| } else { |
| curSplitIdx = ((PigSplit)((Context)PigMapReduce.sJobContext).getInputSplit()).getSplitIndex(); |
| } |
| Object firstBaseKey = firstRearrangedTup.get(1); |
| List<Pair<Integer,Tuple>> index = readIndex(); |
| |
| // If we are in last split, firstKeyOfNextSplit is marked as null. |
| // Null value of firstKeyOfNextSplit is used to indicate collect all |
| // tuples from both base loaders as well as side loaders |
| // and process them in this map. |
| // Note that nulls are smaller then anything else. So, if there are |
| // nulls in data, they will be in very first split. So, null value of |
| // this variable can be used to determine whether we are working in last |
| // split or not. |
| |
| firstKeyOfNextSplit = getFirstKeyOfNextSplit(curSplitIdx, index); |
| |
| // Open all other streams. |
| // If this is first split, start from very first record. |
| // For all other splits, bind to the first key which is greater |
| // then or equal to the first key of the map. |
| |
| for(int i=0; i < relationCnt-1; i ++){ |
| |
| LoadFunc loadfunc = (LoadFunc)PigContext.instantiateFuncFromSpec(sidFuncSpecs.get(i)); |
| loadfunc.setUDFContextSignature(loaderSignatures.get(i)); |
| Job dummyJob = new Job(new Configuration(PigMapReduce.sJobConfInternal.get())); |
| loadfunc.setLocation(sideFileSpecs.get(i), dummyJob); |
| ((IndexableLoadFunc)loadfunc).initialize(dummyJob.getConfiguration()); |
| sideLoaders.add(loadfunc); |
| Tuple rearranged; |
| |
| if ( index.get(0).first.equals(curSplitIdx)){ |
| // This is a first split, bind at very first record in all side relations. |
| Tuple t = loadfunc.getNext(); |
| if(null == t) // This side relation is entirely empty. |
| continue; |
| rearranged = applyLRon(t, i+1); |
| heap.offer(rearranged); |
| continue; |
| } |
| else{ |
| // This is not a first split, we need to bind to the key equal |
| // to the firstBaseKey or next key thereafter. |
| |
| // First seek close to base key. |
| ((IndexableLoadFunc)loadfunc).seekNear(firstBaseKey instanceof |
| Tuple ? (Tuple) firstBaseKey : mTupleFactory.newTuple(firstBaseKey)); |
| |
| // Since contract of IndexableLoadFunc is not clear where we |
| // will land up after seekNear() call, |
| // we start reading from side loader to get to the point where key |
| // is actually greater or equal to base key. |
| while(true){ |
| Tuple t = loadfunc.getNext(); |
| if(t==null) // This relation has ended. |
| break; |
| rearranged = applyLRon(t, i+1); |
| if(rearranged.get(1) == null) // If we got a null key here |
| continue; // it implies we are still behind. |
| |
| int cmpVal = ((Comparable<Object>)rearranged.get(1)).compareTo(firstBaseKey); |
| if(cmpVal >= 0){ // Break away as soon as we get ahead. |
| |
| // Add this tuple in heap only if it needs to be processed in |
| // this map. That is it needs to be smaller then next split's |
| // first key, unless this is the last split, in which case it |
| // will be processed in this map. |
| if(firstKeyOfNextSplit == null || firstKeyOfNextSplit.compareTo(rearranged.get(1)) > 0 ){ |
| heap.offer(rearranged); |
| } |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| private List<Pair<Integer,Tuple>> readIndex() throws ExecException{ |
| |
| // Assertions on index we are about to read: |
| // We are reading index from a file through POLoad which will return tuples. |
| // These tuples looks as follows: |
| // (key1, key2, key3,..,WritableComparable(wc),splitIdx(si)) |
| // Index is sorted on key1, then on key2, key3.. , wc, si. |
| // Index contains exactly one entry per split. |
| // Since this is loaded through CollectableLoadFunc, keys can't repeat. |
| // Thus, each index entry contains unique key. |
| // Since nulls are smaller then anything else, if its in data, key with null value |
| // should be very first entry of index. |
| |
| // First create a loader to read index. |
| POLoad ld = new POLoad(new OperatorKey(this.mKey.scope,NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)), |
| new FileSpec(indexFileName, idxFuncSpec)); |
| |
| // Index file is distributed through Distributed Cache to all mappers. So, read it locally. |
| Properties props = ConfigurationUtil.getLocalFSProperties(); |
| ld.setPc(new PigContext(ExecType.LOCAL, props)); |
| |
| // Each index entry is read as a pair of split index and a tuple consisting of key. |
| List<Pair<Integer,Tuple>> index = new ArrayList<Pair<Integer,Tuple>>(); |
| |
| for(Result res = ld.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = ld.getNextTuple()){ |
| |
| Tuple idxTuple = (Tuple)res.result; |
| int colCnt = idxTuple.size()-2; |
| Tuple keyTuple = mTupleFactory.newTuple(colCnt); |
| |
| for (int i=0; i< colCnt; i++) |
| keyTuple.set(i, idxTuple.get(i)); |
| |
| index.add(new Pair<Integer, Tuple>((Integer)idxTuple.get(colCnt+1), keyTuple)); |
| } |
| |
| return index; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Comparable<Object> getFirstKeyOfNextSplit(final int curSplitIdx, final List<Pair<Integer,Tuple>> index) throws IOException{ |
| |
| // First find out the index entry corresponding to our current split. |
| int i; |
| for(i=0; i < index.size(); i++){ |
| if(index.get(i).first.equals(curSplitIdx)) |
| break; |
| } |
| |
| // Now read key of the very next index entry. |
| if(i < index.size()-1){ |
| Tuple keyTuple = index.get(i+1).second; |
| return keyTuple.size() == 1 ? (Comparable<Object>)keyTuple.get(0) : keyTuple; |
| } |
| |
| // If we are here it implies, current split is the last split. |
| return null; |
| |
| |
| } |
| |
| private Tuple applyLRon(final Tuple inp, final int lrIdx) throws ExecException{ |
| |
| //Separate Key & Value of input using corresponding LR operator |
| POLocalRearrange lr = LRs[lrIdx]; |
| lr.attachInput(inp); |
| Result lrOut = lr.getNextTuple(); |
| |
| if(lrOut.returnStatus!=POStatus.STATUS_OK){ |
| int errCode = 2167; |
| String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly"; |
| throw new ExecException(errMsg,errCode,PigException.BUG); |
| } |
| lr.detachInput(); |
| return mTupleFactory.newTuple(((Tuple)lrOut.result).getAll()); |
| } |
| |
| @Override |
| public void visit(PhyPlanVisitor v) throws VisitorException { |
| v.visitMergeCoGroup(this); |
| } |
| |
| @Override |
| public String name() { |
| return getAliasString() + "MergeCogroup[" |
| + DataType.findTypeName(resultType) + "]" + " - " |
| + mKey.toString(); |
| } |
| |
| @Override |
| public boolean supportsMultipleInputs() { |
| return true; |
| } |
| |
| @Override |
| public boolean supportsMultipleOutputs() { |
| return false; |
| } |
| |
| public List<PhysicalPlan> getLRInnerPlansOf(int i) { |
| return this.LRs[i].getPlans(); |
| } |
| |
| public void setSideLoadFuncs(List<FuncSpec> sideLoadFuncs) { |
| this.sidFuncSpecs = sideLoadFuncs; |
| } |
| |
| public void setSideFileSpecs(List<String> sideFileSpecs) { |
| this.sideFileSpecs = sideFileSpecs; |
| } |
| |
| public String getIndexFileName() { |
| return indexFileName; |
| } |
| |
| public void setIndexFileName(String indexFileName) { |
| this.indexFileName = indexFileName; |
| } |
| |
| public FuncSpec getIdxFuncSpec() { |
| return idxFuncSpec; |
| } |
| |
| public void setIdxFuncSpec(FuncSpec idxFileSpec) { |
| this.idxFuncSpec = idxFileSpec; |
| } |
| |
| public void setLoaderSignatures(List<String> loaderSignatures) { |
| this.loaderSignatures = loaderSignatures; |
| } |
| |
| private void readObject(final ObjectInputStream is) throws IOException, |
| ClassNotFoundException, ExecException { |
| |
| is.defaultReadObject(); |
| mTupleFactory = TupleFactory.getInstance(); |
| this.heap = new PriorityQueue<Tuple>(11, new Comparator<Tuple>() { |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public int compare(final Tuple resLHS, final Tuple resRHS) { |
| try { |
| // First, about null keys. |
| // Java's priority queue doesn't handle null, so we handle it ourselves. |
| |
| // If both keys are null, then keys of side loader is considered smaller. |
| // That is so, because when we poll the heap, we want to get the keys |
| // of side loader first then the keys of base loader. |
| |
| // If null is compared against non-null, null is smaller. |
| |
| Object leftKey = resLHS.get(1); |
| Object rightKey = resRHS.get(1); |
| |
| if (null == leftKey && null == rightKey){ |
| return ((Byte)resRHS.get(0)).compareTo((Byte)resLHS.get(0)); |
| } |
| if(null == leftKey) |
| return -1; |
| if(null == rightKey) |
| return 1; |
| |
| // Now, about non-null keys. |
| // Compare the keys which are at index 1 of tuples being |
| // put in heap. |
| |
| int cmpval = ((Comparable<Object>)leftKey).compareTo(rightKey); |
| |
| // If keys are equal, tuple from side relations |
| // are considered smaller. This is so because we want |
| // to get back tuple of the mapper last when polling tuples |
| // from heap. And index of mapped relation is 0. |
| |
| return cmpval == 0 ? ((Byte)resRHS.get(0)).compareTo((Byte)resLHS.get(0)) : cmpval; |
| } catch (ExecException e) { |
| |
| // Alas, no choice but to throw Runtime exception. |
| String errMsg = "Exception occurred in compare() of heap in POMergeCogroup."; |
| throw new RuntimeException(errMsg,e); |
| } |
| } |
| }); |
| |
| this.createNewBags = true; |
| this.lastTime = false; |
| this.relationCnt = LRs.length; |
| this.outBags = new DataBag[relationCnt]; |
| this.firstTime = true; |
| this.workingOnNewKey = true; |
| this.sideLoaders = new ArrayList<LoadFunc>(); |
| } |
| |
| // This function is only for debugging. Call it whenever you want to print |
| // the current state of heap. |
| int counter = 0; |
| private void printHeap(){ |
| System.out.println("Printing heap :"+ ++counter); |
| PriorityQueue<Tuple> copy = new PriorityQueue<Tuple>(heap); |
| System.out.println("Heap size: "+heap.size()); |
| int i =0; |
| while(!copy.isEmpty()){ |
| System.out.println(i+++"th item in heap: "+ copy.poll()); |
| } |
| } |
| |
| @Override |
| public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { |
| return null; |
| } |
| } |