blob: cb85a5c8330dc979cae463d33c638fbcab84bd19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.ReadOnceBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.Illustrator;
/**
* The package operator that packages
* the globally rearranged tuples into
* output format as required by co-group.
* This is last stage of processing co-group.
* This operator has a slightly different
* format than other operators in that, it
* takes two things as input. The key being
* worked on and the iterator of bags that
* contain indexed tuples that just need to
* be packaged into their appropriate output
* bags based on the index.
*/
public class POPackage extends PhysicalOperator {
private static final long serialVersionUID = 1L;
//The iterator of indexed Tuples
//that is typically provided by
//Hadoop
transient Iterator<NullableTuple> tupIter;
//The key being worked on
protected Object key;
//The number of inputs to this
//co-group. 0 indicates a distinct, which means there will only be a
//key, no value.
protected int numInputs;
private boolean lastBagReadOnly = true;
protected Packager pkgr;
protected PigNullableWritable keyWritable;
private transient boolean initialized;
private transient boolean useDefaultBag;
private transient int accumulativeBatchSize;
public POPackage(OperatorKey k) {
this(k, -1, null);
}
public POPackage(OperatorKey k, int rp) {
this(k, rp, null);
}
public POPackage(OperatorKey k, List<PhysicalOperator> inp) {
this(k, -1, inp);
}
public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
this(k, rp, inp, new Packager());
}
public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp,
Packager pkgr) {
super(k, rp, inp);
numInputs = -1;
this.pkgr = pkgr;
}
public POPackage(POPackage copy) {
super(copy);
this.numInputs = copy.numInputs;
this.pkgr = copy.pkgr;
this.pkgr.keyInfo = copy.pkgr.keyInfo;
}
@Override
public void setIllustrator(Illustrator illustrator) {
super.setIllustrator(illustrator);
pkgr.setIllustrator(illustrator);
}
@Override
public String name() {
return getAliasString() + "Package" + "(" + pkgr.name() + ")" + "["
+ DataType.findTypeName(resultType) + "]" + "{"
+ DataType.findTypeName(pkgr.getKeyType()) + "}" + " - "
+ mKey.toString();
}
@Override
public boolean supportsMultipleInputs() {
return false;
}
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitPackage(this);
}
@Override
public boolean supportsMultipleOutputs() {
return false;
}
/**
* Attaches the required inputs
* @param k - the key being worked on
* @param inp - iterator of indexed tuples typically
* obtained from Hadoop
*/
public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
try {
tupIter = inp;
key = pkgr.getKey(k);
keyWritable = k;
inputAttached = true;
} catch (Exception e) {
throw new RuntimeException(
"Error attaching input for key " + k +
" in " + name() + " at location " + getOriginalLocations(), e);
}
}
/**
* attachInput's better half!
*/
@Override
public void detachInput() {
tupIter = null;
key = null;
inputAttached = false;
}
public int getNumInps() {
return numInputs;
}
public void setNumInps(int numInps) {
this.numInputs = numInps;
pkgr.setNumInputs(numInps);
}
/**
* From the inputs, constructs the output tuple for this co-group in the
* required format which is (key, {bag of tuples from input 1}, {bag of
* tuples from input 2}, ...)
*/
@Override
public Result getNextTuple() throws ExecException {
if (!initialized) {
initialized = true;
if (PigMapReduce.sJobConfInternal.get() != null) {
String bagType = PigMapReduce.sJobConfInternal.get().get(
PigConfiguration.PIG_CACHEDBAG_TYPE);
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
}
accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
// If multiquery, the last bag is InternalCachedBag and should not
// set ReadOnly flag, otherwise we will materialize again to another
// InternalCachedBag
if (pkgr instanceof MultiQueryPackager) {
lastBagReadOnly = false;
}
}
int numInputs = pkgr.getNumInputs(keyWritable.getIndex());
boolean[] readOnce = new boolean[numInputs];
for (int i = 0; i < numInputs; i++)
readOnce[i] = false;
if (isInputAttached()) {
// Create numInputs bags
DataBag[] dbs = null;
dbs = new DataBag[numInputs];
if (isAccumulative()) {
readOnce[numInputs - 1] = false;
// create bag wrapper to pull tuples in many batches
// all bags have reference to the sample tuples buffer
// which contains tuples from one batch
POPackageTupleBuffer buffer = new POPackageTupleBuffer(accumulativeBatchSize, key, tupIter);
for (int i = 0; i < numInputs; i++) {
dbs[i] = new AccumulativeBag(buffer, i);
}
} else {
if (lastBagReadOnly) {
readOnce[numInputs - 1] = true;
}
// We know the tuples will come sorted by index, so we can wrap
// the last input in a ReadOnceBag and let the Packager decide
// whether or not to read into memory
// create bag to pull all tuples out of iterator
for (int i = 0; i < numInputs; i++) {
dbs[i] = useDefaultBag ? mBagFactory.newDefaultBag()
// In a very rare case if there is a POStream after this
// POPackage in the pipeline and is also blocking the
// pipeline;
// constructor argument should be 2 * numInputs. But for one
// obscure
// case we don't want to pay the penalty all the time.
: new InternalCachedBag(numInputs);
}
// For each indexed tup in the inp, sort them
// into their corresponding bags based
// on the index
while (tupIter.hasNext()) {
NullableTuple ntup = tupIter.next();
int index = ntup.getIndex();
if (index == numInputs - 1) {
if (pkgr.getUseSecondaryKey()) {
if (dbs[index] == null) {
dbs[index] = useDefaultBag ? mBagFactory
.newDefaultBag() : new InternalCachedBag(numInputs);
}
} else {
dbs[index] = new PeekedBag(pkgr, ntup, tupIter, keyWritable);
break;
}
}
Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
if (numInputs == 1) {
// this is for multi-query merge where
// the numInputs is always 1, but the index
// (the position of the inner plan in the
// enclosed operator) may not be 1.
dbs[0].add(copy);
} else {
dbs[index].add(copy);
}
if (getReporter() != null) {
getReporter().progress();
}
}
}
// Construct the output tuple by appending
// the key and all the above constructed bags
// and return it.
pkgr.attachInput(key, dbs, readOnce);
detachInput();
}
return pkgr.getNext();
}
public Packager getPkgr() {
return pkgr;
}
public void setPkgr(Packager pkgr) {
this.pkgr = pkgr;
pkgr.setParent(this);
pkgr.setIllustrator(illustrator);
}
/**
* Make a deep copy of this operator.
* @throws CloneNotSupportedException
*/
@Override
public POPackage clone() throws CloneNotSupportedException {
POPackage clone = (POPackage)super.clone();
clone.mKey = new OperatorKey(mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope));
clone.requestedParallelism = requestedParallelism;
clone.resultType = resultType;
clone.numInputs = numInputs;
clone.pkgr = (Packager) this.pkgr.clone();
return clone;
}
public class POPackageTupleBuffer implements AccumulativeTupleBuffer {
private List<Tuple>[] bags;
private Iterator<NullableTuple> iter;
private int batchSize;
private Object currKey;
@SuppressWarnings("unchecked")
public POPackageTupleBuffer(int batchSize, Object key, Iterator<NullableTuple> iter) {
this.batchSize = batchSize;
this.currKey = key;
this.iter = iter;
this.bags = new List[numInputs];
for(int i=0; i<numInputs; i++) {
this.bags[i] = new ArrayList<Tuple>(batchSize);
}
}
@Override
public boolean hasNextBatch() {
return iter.hasNext();
}
@Override
public void nextBatch() throws IOException {
for(int i=0; i<bags.length; i++) {
bags[i].clear();
}
key = currKey;
for(int i=0; i<batchSize; i++) {
if (iter.hasNext()) {
NullableTuple ntup = iter.next();
int index = ntup.getIndex();
Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
if (numInputs == 1) {
// this is for multi-query merge where
// the numInputs is always 1, but the index
// (the position of the inner plan in the
// enclosed operator) may not be 1.
bags[0].add(copy);
} else {
bags[index].add(copy);
}
}else{
break;
}
}
}
@Override
public void clear() {
for(int i=0; i<bags.length; i++) {
bags[i].clear();
}
iter = null;
}
@Override
public Iterator<Tuple> getTuples(int index) {
return bags[index].iterator();
}
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return POPackage.this.illustratorMarkup(in, out, eqClassIndex);
}
};
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return pkgr.illustratorMarkup(in, out, eqClassIndex);
}
public int numberOfEquivalenceClasses() {
return pkgr.numberOfEquivalenceClasses();
}
// A ReadOnceBag that we've already "peeked" at
private static class PeekedBag extends ReadOnceBag {
private static final long serialVersionUID = 1L;
NullableTuple head;
int index;
public PeekedBag(Packager pkgr, NullableTuple head,
Iterator<NullableTuple> tupIter,
PigNullableWritable keyWritable) {
super(pkgr, tupIter, keyWritable);
this.head = head;
this.index = head.getIndex();
}
@Override
public Iterator<Tuple> iterator() {
return new Iterator<Tuple>() {
boolean headReturned = false;
@Override
public boolean hasNext() {
if (!headReturned)
return true;
return tupIter.hasNext();
}
@Override
public Tuple next() {
if (!headReturned) {
headReturned = true;
try {
return pkgr.getValueTuple(keyWritable, head,
head.getIndex());
} catch (ExecException e) {
throw new RuntimeException(
"PeekedBag failed to get value tuple : "
+ e.toString());
}
}
NullableTuple ntup = tupIter.next();
Tuple ret = null;
try {
ret = pkgr.getValueTuple(keyWritable, ntup, index);
} catch (ExecException e) {
throw new RuntimeException(
"PeekedBag failed to get value tuple : "
+ e.toString());
}
if (getReporter() != null) {
getReporter().progress();
}
return ret;
}
@Override
public void remove() {
throw new UnsupportedOperationException(
"PeekedBag does not support removal");
}
};
}
}
}