blob: 982d70f91d35a694335bdbe58255e0dc69e2e094 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.Iterator;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.pen.Illustrator;
public class JoinPackager extends Packager {
private static final long serialVersionUID = 1L;
private POOptimizedForEach forEach;
private boolean newKey = true;
private Tuple res = null;
private static final Result eopResult = new Result(POStatus.STATUS_EOP, null);
public static final String DEFAULT_CHUNK_SIZE = "1000";
private long chunkSize = Long.parseLong(DEFAULT_CHUNK_SIZE);
private Result forEachResult;
private DataBag[] dbs = null;
private int lastBagIndex;
private Iterator<Tuple> lastBagIter;
public JoinPackager(Packager p, POForEach f) {
super();
String scope = f.getOperatorKey().getScope();
NodeIdGenerator nig = NodeIdGenerator.getGenerator();
forEach = new POOptimizedForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
if (p!=null)
{
setKeyType(p.getKeyType());
setNumInputs(p.getNumInputs());
lastBagIndex = numInputs - 1;
setInner(p.getInner());
setKeyInfo(p.getKeyInfo());
this.isKeyTuple = p.isKeyTuple;
this.isKeyCompound = p.isKeyCompound;
}
if (f!=null)
{
setInputPlans(f.getInputPlans());
setToBeFlattened(f.getToBeFlattened());
}
}
/**
* Calls getNext to get next ForEach result. The input for POJoinPackage is
* a (key, NullableTuple) pair. We will materialize n-1 inputs into bags, feed input#n
* one tuple a time to the delegated ForEach operator, the input for ForEach is
*
* (input#1, input#2, input#3....input#n[i]), i=(1..k), suppose input#n consists
*
* of k tuples.
* For every ForEach input, pull all the results from ForEach.
* getNext will be called multiple times for a particular input,
* it returns one output tuple from ForEach every time we call getNext,
* so we need to maintain internal status to keep tracking of where we are.
*/
@Override
public Result getNext() throws ExecException {
Tuple it = null;
// If we see a new NullableTupleIterator, materialize n-1 inputs, construct ForEach input
// tuple res = (key, input#1, input#2....input#n), the only missing value is input#n,
// we will get input#n one tuple a time, fill in res, feed to ForEach.
// After this block, we have the first tuple of input#n in hand (kept in variable it)
if (newKey)
{
// Put n-1 inputs into bags
dbs = new DataBag[numInputs];
for (int i = 0; i < numInputs - 1; i++) {
dbs[i] = bags[i];
}
// For last bag, we always use NonSpillableBag.
dbs[lastBagIndex] = new NonSpillableDataBag((int)chunkSize);
lastBagIter = bags[lastBagIndex].iterator();
// If we don't have any tuple for input#n
// we do not need any further process, return EOP
if (!lastBagIter.hasNext()) {
// we will return at this point because we ought
// to be having a flatten on this last input
// and we have an empty bag which should result
// in this key being taken out of the output
newKey = true;
return eopResult;
}
res = mTupleFactory.newTuple(numInputs+1);
for (int i = 0; i < dbs.length; i++)
res.set(i+1,dbs[i]);
res.set(0,key);
// if we have an inner anywhere and the corresponding
// bag is empty, we can just return
for (int i = 0; i < dbs.length - 1; i++) {
if(inner[i]&&dbs[i].size()==0){
detachInput();
return eopResult;
}
}
newKey = false;
}
// Keep attaching input tuple to ForEach, until:
// 1. We can initialize ForEach.getNext();
// 2. There is no more input#n
while (lastBagIter.hasNext() || forEach.processingPlan) {
// if a previous call to foreach.getNext()
// has still not returned all output, process it
while (forEach.processingPlan) {
forEachResult = forEach.getNextTuple();
switch (forEachResult.returnStatus) {
case POStatus.STATUS_OK:
case POStatus.STATUS_ERR:
return forEachResult;
case POStatus.STATUS_NULL:
continue;
case POStatus.STATUS_EOP:
break;
}
}
if (lastBagIter.hasNext()) {
// try setting up a bag of CHUNKSIZE OR
// the remainder of the bag of last input
// (if < CHUNKSIZE) to foreach
dbs[lastBagIndex].clear(); // clear last chunk
for (int i = 0; i < chunkSize && lastBagIter.hasNext(); i++) {
it = lastBagIter.next();
dbs[lastBagIndex].add(it);
}
} else {
detachInput();
return eopResult;
}
// Attach the input to forEach
forEach.attachInput(res);
// pull output tuple from ForEach
Result forEachResult = forEach.getNextTuple();
{
switch (forEachResult.returnStatus) {
case POStatus.STATUS_OK:
case POStatus.STATUS_ERR:
return forEachResult;
case POStatus.STATUS_NULL:
continue;
case POStatus.STATUS_EOP:
break;
}
}
}
detachInput();
return eopResult;
}
@Override
public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
throws ExecException {
checkBagType();
this.key = key;
this.bags = bags;
this.readOnce = readOnce;
// JoinPackager expects all but the last bag to be materialized
for (int i = 0; i < bags.length - 1; i++) {
if (readOnce[i]) {
DataBag materializedBag = getBag();
materializedBag.addAll(bags[i]);
bags[i] = materializedBag;
}
}
if (readOnce[numInputs - 1] != true) {
throw new ExecException(
"JoinPackager expects the last input to be streamed");
}
this.newKey = true;
}
public List<PhysicalPlan> getInputPlans() {
return forEach.getInputPlans();
}
public void setInputPlans(List<PhysicalPlan> plans) {
forEach.setInputPlans(plans);
}
public void setToBeFlattened(List<Boolean> flattens) {
forEach.setToBeFlattened(flattens);
}
/**
* @return the forEach
*/
public POOptimizedForEach getForEach() {
return forEach;
}
/**
* @param chunkSize - the chunk size for the biggest input
*/
public void setChunkSize(long chunkSize) {
this.chunkSize = chunkSize;
}
@Override
public void setIllustrator(Illustrator illustrator) {
this.illustrator = illustrator;
forEach.setIllustrator(illustrator);
}
@Override
public String name() {
return this.getClass().getSimpleName() + "(" + forEach.getFlatStr() + ")";
}
}