blob: 805f21a1985ac901ff2ffc6bebe574f383c754e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.pen.Illustrable;
import org.apache.pig.pen.Illustrator;
import org.apache.pig.pen.util.ExampleTuple;
import org.apache.pig.pen.util.LineageTracer;
public class Packager implements Illustrable, Serializable, Cloneable {
private static final long serialVersionUID = 1L;
protected boolean[] readOnce;
protected DataBag[] bags;
public static enum PackageType {
GROUP, JOIN, BLOOMJOIN
};
protected transient Illustrator illustrator = null;
// The key being worked on
protected Object key;
// marker to indicate if key is a tuple
protected boolean isKeyTuple = false;
// marker to indicate if the tuple key is compound in nature
protected boolean isKeyCompound = false;
// key's type
protected byte keyType;
// The number of inputs to this
// co-group. 0 indicates a distinct, which means there will only be a
// key, no value.
int numInputs;
// If the attaching map-reduce plan use secondary sort key
boolean useSecondaryKey = false;
// Denotes if inner is specified
// on a particular input
boolean[] inner;
// flag to denote whether there is a distinct
// leading to this package
protected boolean distinct = false;
// A mapping of input index to key information got from LORearrange
// for that index. The Key information is a pair of boolean, Map.
// The boolean indicates whether there is a lone project(*) in the
// cogroup by. If not, the Map has a mapping of column numbers in the
// "value" to column numbers in the "key" which contain the fields in
// the "value"
protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
private PackageType pkgType;
private transient boolean initialized;
private transient boolean useDefaultBag;
protected POPackage parent = null;
protected static final BagFactory mBagFactory = BagFactory.getInstance();
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
public Object getKey(PigNullableWritable key) throws ExecException {
Object keyObject = key.getValueAsPigType();
if (useSecondaryKey) {
return ((Tuple) keyObject).get(0);
} else {
return keyObject;
}
}
public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
throws ExecException {
checkBagType();
this.key = key;
this.bags = bags;
this.readOnce = readOnce;
// We assume that we need all bags materialized. Specialized subclasses
// may choose to handle this differently
for (int i = 0; i < bags.length; i++) {
if (readOnce[i]) {
DataBag materializedBag = getBag();
materializedBag.addAll(bags[i]);
bags[i] = materializedBag;
}
}
}
public Result getNext() throws ExecException {
if (bags == null) {
return new Result(POStatus.STATUS_EOP, null);
}
Tuple res;
if (isDistinct()) {
// only set the key which has the whole
// tuple
res = mTupleFactory.newTuple(1);
res.set(0, key);
} else {
// Construct the output tuple by appending
// the key and all the above constructed bags
// and return it.
res = mTupleFactory.newTuple(numInputs + 1);
res.set(0, key);
int i = -1;
for (DataBag bag : bags) {
i++;
if (inner[i]) {
if (bag.size() == 0) {
detachInput();
Result r = new Result();
r.returnStatus = POStatus.STATUS_NULL;
return r;
}
}
res.set(i + 1, bag);
}
}
Result r = new Result();
r.returnStatus = POStatus.STATUS_OK;
r.result = illustratorMarkup(null, res, 0);
detachInput();
return r;
}
public void detachInput() {
key = null;
bags = null;
}
protected Tuple illustratorMarkup2(Object in, Object out) {
if (illustrator != null) {
ExampleTuple tOut;
if (!(out instanceof ExampleTuple)) {
tOut = new ExampleTuple((Tuple) out);
} else {
tOut = (ExampleTuple) out;
}
illustrator.getLineage().insert(tOut);
tOut.synthetic = ((ExampleTuple) in).synthetic;
illustrator.getLineage().union(tOut, (Tuple) in);
return tOut;
} else
return (Tuple) out;
}
protected Tuple starMarkup(Tuple key, Tuple val, Tuple out){
if (illustrator != null){
Tuple copy = illustratorMarkup2(key, out);
// For distinct, we also need to retain lineage information from the values.
if (isDistinct())
copy = illustratorMarkup2(val, out);
return copy;
} else
return (Tuple) out;
}
public Tuple getValueTuple(PigNullableWritable keyWritable,
NullableTuple ntup, int index) throws ExecException {
Object key = getKey(keyWritable);
// Need to make a copy of the value, as hadoop uses the same ntup
// to represent each value.
Tuple val = (Tuple) ntup.getValueAsPigType();
Tuple copy = null;
// The "value (val)" that we just got may not
// be the complete "value". It may have some portions
// in the "key" (look in POLocalRearrange for more comments)
// If this is the case we need to stitch
// the "value" together.
Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = keyInfo.get(index);
boolean isProjectStar = lrKeyInfo.first;
Map<Integer, Integer> keyLookup = lrKeyInfo.second;
int keyLookupSize = keyLookup.size();
if (keyLookupSize > 0) {
// we have some fields of the "value" in the
// "key".
int finalValueSize = keyLookupSize + val.size();
copy = mTupleFactory.newTuple(finalValueSize);
int valIndex = 0; // an index for accessing elements from
// the value (val) that we have currently
for (int i = 0; i < finalValueSize; i++) {
Integer keyIndex = keyLookup.get(i);
if (keyIndex == null) {
// the field for this index is not in the
// key - so just take it from the "value"
// we were handed
copy.set(i, val.get(valIndex));
valIndex++;
} else {
// the field for this index is in the key
if (isKeyTuple && isKeyCompound) {
// the key is a tuple, extract the
// field out of the tuple
copy.set(i, ((Tuple) key).get(keyIndex));
} else {
copy.set(i, key);
}
}
}
copy = illustratorMarkup2(val, copy);
} else if (isProjectStar) {
// the whole "value" is present in the "key"
copy = mTupleFactory.newTuple(((Tuple) key).getAll());
copy = starMarkup((Tuple) key, val, copy);
} else {
// there is no field of the "value" in the
// "key" - so just make a copy of what we got
// as the "value"
copy = mTupleFactory.newTuple(val.getAll());
copy = illustratorMarkup2(val, copy);
}
return copy;
}
public byte getKeyType() {
return keyType;
}
public void setKeyType(byte keyType) {
this.keyType = keyType;
}
/**
* @return the isKeyTuple
*/
public boolean getKeyTuple() {
return isKeyTuple;
}
/**
* @return the keyAsTuple
*/
public Tuple getKeyAsTuple() {
return isKeyTuple ? (Tuple) key : null;
}
/**
* @return the key
*/
public Object getKey() {
return key;
}
public boolean[] getInner() {
return inner;
}
public void setInner(boolean[] inner) {
this.inner = inner;
}
/**
* @param keyInfo the keyInfo to set
*/
public void setKeyInfo(
Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
this.keyInfo = keyInfo;
}
/**
* @param keyTuple the keyTuple to set
*/
public void setKeyTuple(boolean keyTuple) {
this.isKeyTuple = keyTuple;
}
/**
* @param keyCompound the keyCompound to set
*/
public void setKeyCompound(boolean keyCompound) {
this.isKeyCompound = keyCompound;
}
/**
* @return the keyInfo
*/
public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
return keyInfo;
}
public Illustrator getIllustrator() {
return illustrator;
}
@Override
public void setIllustrator(Illustrator illustrator) {
this.illustrator = illustrator;
}
/**
* @return the distinct
*/
public boolean isDistinct() {
return distinct;
}
/**
* @param distinct the distinct to set
*/
public void setDistinct(boolean distinct) {
this.distinct = distinct;
}
public void setUseSecondaryKey(boolean useSecondaryKey) {
this.useSecondaryKey = useSecondaryKey;
}
public boolean getUseSecondaryKey() {
return useSecondaryKey;
}
public void setPackageType(PackageType type) {
this.pkgType = type;
}
public PackageType getPackageType() {
return pkgType;
}
public int getNumInputs(byte index) {
return numInputs;
}
public int getNumInputs() {
return numInputs;
}
public void setNumInputs(int numInputs) {
this.numInputs = numInputs;
}
@Override
public Packager clone() throws CloneNotSupportedException {
Packager clone = (Packager) super.clone();
clone.setNumInputs(numInputs);
clone.setPackageType(pkgType);
clone.setDistinct(distinct);
if (inner != null) {
clone.inner = new boolean[inner.length];
for (int i = 0; i < inner.length; i++) {
clone.inner[i] = inner[i];
}
} else
clone.inner = null;
if (keyInfo != null)
clone.setKeyInfo(new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(
keyInfo));
clone.setKeyCompound(isKeyCompound);
clone.setKeyTuple(isKeyTuple);
clone.setKeyType(keyType);
clone.setUseSecondaryKey(useSecondaryKey);
return clone;
}
public String name() {
return this.getClass().getSimpleName();
}
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
// All customized packagers are introduced during MRCompilaition.
// Illustrate happens before that, so we only have to focus on the basic
// POPackage
if (illustrator != null) {
ExampleTuple tOut = new ExampleTuple((Tuple) out);
LineageTracer lineageTracer = illustrator.getLineage();
lineageTracer.insert(tOut);
boolean synthetic = false;
if (illustrator.getEquivalenceClasses() == null) {
LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
for (int i = 0; i < numInputs; ++i) {
IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
equivalenceClasses.add(equivalenceClass);
}
illustrator.setEquivalenceClasses(equivalenceClasses, parent);
}
if (isDistinct()) {
int count = 0;
for (Tuple tmp : bags[0]){
count++;
if (!tmp.equals(tOut))
lineageTracer.union(tOut, tmp);
}
if (count > 1) // only non-distinct tuples are inserted into the
// equivalence class
illustrator.getEquivalenceClasses().get(eqClassIndex)
.add(tOut);
illustrator.addData((Tuple) tOut);
return (Tuple) tOut;
}
boolean outInEqClass = true;
try {
for (int i = 1; i < numInputs + 1; i++) {
DataBag dbs = (DataBag) ((Tuple) out).get(i);
Iterator<Tuple> iter = dbs.iterator();
if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2
// records
outInEqClass = false;
while (iter.hasNext()) {
Tuple tmp = iter.next();
// any of synthetic data in bags causes the output tuple
// to be synthetic
if (!synthetic && ((ExampleTuple) tmp).synthetic)
synthetic = true;
lineageTracer.union(tOut, tmp);
}
}
} catch (ExecException e) {
// TODO better exception handling
throw new RuntimeException("Illustrator exception :"
+ e.getMessage());
}
if (outInEqClass)
illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
tOut.synthetic = synthetic;
illustrator.addData((Tuple) tOut);
return tOut;
} else
return (Tuple) out;
}
public void setParent(POPackage pack) {
parent = pack;
}
public int numberOfEquivalenceClasses() {
return 1;
}
public void checkBagType() {
if(!initialized){
initialized = true;
if (PigMapReduce.sJobConfInternal.get() != null) {
String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE);
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
}
}
}
public DataBag getBag(){
return useDefaultBag ? mBagFactory.newDefaultBag()
// In a very rare case if there is a POStream after this
// POJoinPackage in the pipeline and is also blocking the pipeline;
// constructor argument should be 2 * numInputs. But for one obscure
// case we don't want to pay the penalty all the time.
: new InternalCachedBag(numInputs);
}
}