blob: b158756b48560f04cc165dbbca95d1d3f46e650c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin.TuplesToSchemaTupleList;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.SchemaTupleFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
/**
* The operator models the join keys using the Local Rearrange operators which
* are configured with the plan specified by the user. It also sets up one
* Hashtable per replicated input which maps the Key(k) stored as a Tuple to a
* DataBag which holds all the values in the input having the same key(k) The
* getNext() reads an input from its predecessor and separates them into key &
* value. It configures a foreach operator with the databags obtained from each
* Hashtable for the key and also with the value for the fragment input. It then
* returns tuples returned by this foreach operator.
*/
// We intentionally skip type checking in backend for performance reasons
public class POFRJoin extends PhysicalOperator {
private static final Log log = LogFactory.getLog(POFRJoin.class);
private static final long serialVersionUID = 1L;
// The number in the input list which denotes the fragmented input
protected int fragment;
// There can be n inputs each being a List<PhysicalPlan>
// Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
protected List<List<PhysicalPlan>> phyPlanLists;
// The key type for each Local Rearrange operator
protected List<List<Byte>> keyTypes;
// The Local Rearrange operators modeling the join key
protected POLocalRearrange[] LRs;
// The set of files that represent the replicated inputs
protected FileSpec[] replFiles;
// Used to configure the foreach operator
protected ConstantExpression[] constExps;
// Used to produce the cross product of various bags
protected POForEach fe;
// A Boolean variable which denotes if this is a LeftOuter Join or an Inner
// Join
protected boolean isLeftOuterJoin;
// This list contains nullTuples according to schema of various inputs
protected DataBag nullBag;
protected Schema[] inputSchemas;
protected Schema[] keySchemas;
// The array of Hashtables one per replicated input. replicates[fragment] =
// null fragment is the input which is fragmented and not replicated.
protected transient List<Map<? extends Object, ? extends List<Tuple>>> replicates;
// varaible which denotes whether we are returning tuples from the foreach
// operator
protected transient boolean processingPlan;
// A dummy tuple
protected transient Tuple dumTup;
protected transient boolean setUp;
public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
FileSpec[] replFiles, int fragment, boolean isLeftOuter,
Tuple nullTuple) throws ExecException {
this(k, rp, inp, ppLists, keyTypes, replFiles, fragment, isLeftOuter, nullTuple, null, null);
}
public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
FileSpec[] replFiles, int fragment, boolean isLeftOuter,
Tuple nullTuple,
Schema[] inputSchemas,
Schema[] keySchemas)
throws ExecException {
super(k, rp, inp);
phyPlanLists = ppLists;
this.fragment = fragment;
this.keyTypes = keyTypes;
this.replFiles = replFiles;
LRs = new POLocalRearrange[ppLists.size()];
constExps = new ConstantExpression[ppLists.size()];
createJoinPlans(k);
List<Tuple> tupList = new ArrayList<Tuple>();
tupList.add(nullTuple);
nullBag = new NonSpillableDataBag(tupList);
this.isLeftOuterJoin = isLeftOuter;
if (inputSchemas != null) {
this.inputSchemas = inputSchemas;
} else {
this.inputSchemas = new Schema[replFiles == null ? 0 : replFiles.length];
}
if (keySchemas != null) {
this.keySchemas = keySchemas;
} else {
this.keySchemas = new Schema[replFiles == null ? 0 : replFiles.length];
}
}
public POFRJoin(POFRJoin copy) throws ExecException {
super(copy);
this.phyPlanLists = copy.phyPlanLists;
this.fragment = copy.fragment;
this.keyTypes = copy.keyTypes;
this.replFiles = copy.replFiles;
this.replicates = copy.replicates;
this.LRs = copy.LRs;
this.fe = copy.fe;
this.constExps = copy.constExps;
this.processingPlan = copy.processingPlan;
this.nullBag = copy.nullBag;
this.isLeftOuterJoin = copy.isLeftOuterJoin;
this.inputSchemas = copy.inputSchemas;
this.keySchemas = copy.keySchemas;
}
private OperatorKey genKey(OperatorKey old) {
return new OperatorKey(old.scope, NodeIdGenerator.getGenerator()
.getNextNodeId(old.scope));
}
/**
* Configures the Local Rearrange operators & the foreach operator
*
* @param old
* @throws ExecException
*/
private void createJoinPlans(OperatorKey old) throws ExecException {
List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
List<Boolean> flatList = new ArrayList<Boolean>();
int i = -1;
for (List<PhysicalPlan> ppLst : phyPlanLists) {
++i;
POLocalRearrange lr = new POLocalRearrange(genKey(old));
lr.setIndex(i);
lr.setResultType(DataType.TUPLE);
lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE
: keyTypes.get(i).get(0));
try {
lr.setPlans(ppLst);
} catch (PlanException pe) {
int errCode = 2071;
String msg = "Problem with setting up local rearrange's plans.";
throw new ExecException(msg, errCode, PigException.BUG, pe);
}
LRs[i] = lr;
ConstantExpression ce = new ConstantExpression(genKey(old));
ce.setResultType((i == fragment) ? DataType.TUPLE : DataType.BAG);
constExps[i] = ce;
PhysicalPlan pp = new PhysicalPlan();
pp.add(ce);
fePlans.add(pp);
flatList.add(true);
}
// The ForEach operator here is used for generating a Cross-Product
// It is given a set of constant expressions with
// Tuple,(Bag|Tuple),(...)
// It does a cross product on that and produces output.
fe = new POForEach(genKey(old), -1, fePlans, flatList);
}
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitFRJoin(this);
}
@Override
public String name() {
return getAliasString() + "FRJoin[" + DataType.findTypeName(resultType)
+ "]" + " - " + mKey.toString();
}
@Override
public boolean supportsMultipleInputs() {
return true;
}
@Override
public boolean supportsMultipleOutputs() {
return false;
}
@Override
public Result getNextTuple() throws ExecException {
Result res = null;
Result inp = null;
if (!setUp) {
replicates = new ArrayList<Map<? extends Object, ? extends List<Tuple>>>(phyPlanLists.size());
for (int i = 0 ; i < phyPlanLists.size(); i++) {
replicates.add(null);
}
dumTup = mTupleFactory.newTuple(1);
setUpHashMap();
setUp = true;
}
if (processingPlan) {
// Return tuples from the for each operator
// Assumes that it is configured appropriately with
// the bags for the current key.
while (true) {
res = fe.getNextTuple();
if (res.returnStatus == POStatus.STATUS_OK) {
return res;
}
if (res.returnStatus == POStatus.STATUS_EOP) {
// We have completed all cross-products now its time to move
// to next tuple of left side
processingPlan = false;
break;
}
if (res.returnStatus == POStatus.STATUS_ERR) {
return res;
}
if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
}
}
}
while (true) {
// Process the current input
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP
|| inp.returnStatus == POStatus.STATUS_ERR) {
return inp;
} else if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
// Separate Key & Value using the fragment's LR operator
POLocalRearrange lr = LRs[fragment];
lr.attachInput((Tuple) inp.result);
Result lrOut = lr.getNextTuple();
if (lrOut.returnStatus != POStatus.STATUS_OK) {
log.error("LocalRearrange isn't configured right or is not working");
return new Result();
}
Tuple lrOutTuple = (Tuple) lrOut.result;
Object key = lrOutTuple.get(1);
Tuple value = getValueTuple(lr, lrOutTuple);
lr.detachInput();
// Configure the for each operator with the relevant bags
int i = -1;
boolean noMatch = false;
for (ConstantExpression ce : constExps) {
++i;
if (i == fragment) {
// We set the first CE as the tuple from fragmented Left
ce.setValue(value);
continue;
}
Map<? extends Object, ? extends List<Tuple>> replicate = replicates.get(i);
if (replicate.get(key) == null) {
if (isLeftOuterJoin) {
ce.setValue(nullBag);
}
noMatch = true;
break;
}
ce.setValue(new NonSpillableDataBag(replicate.get(key)));
}
// If this is not LeftOuter Join and there was no match we
// skip the processing of this left tuple and move ahead
if (!isLeftOuterJoin && noMatch) {
continue;
}
fe.attachInput(dumTup);
processingPlan = true;
// We are all set, we call getNext (this function) which will call
// getNext on ForEach
// And that will return one tuple of Cross-Product between set
// constant Expressions
// All subsequent calls ( by parent ) to this function will return
// next tuple of crossproduct
Result gn = getNextTuple();
return gn;
}
}
protected static class TupleToMapKey extends HashMap<Object, ArrayList<Tuple>> {
private SchemaTupleFactory tf;
public TupleToMapKey(int ct, SchemaTupleFactory tf) {
super(ct);
this.tf = tf;
}
@Override
public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) {
if (tf != null && key instanceof Tuple) {
key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
}
return (TuplesToSchemaTupleList) super.put(key, val);
}
@Override
public TuplesToSchemaTupleList get(Object key) {
if (tf != null && key instanceof Tuple) {
key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
}
return (TuplesToSchemaTupleList) super.get(key);
}
}
/**
* Builds the HashMaps by reading each replicated input from the DFS using a
* Load operator
*
* @throws ExecException
*/
protected void setUpHashMap() throws ExecException {
SchemaTupleFactory[] inputSchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
SchemaTupleFactory[] keySchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
for (int i = 0; i < inputSchemas.length; i++) {
Schema schema = inputSchemas[i];
if (schema != null) {
log.debug("Using SchemaTuple for FR Join Schema: " + schema);
inputSchemaTupleFactories[i] = SchemaTupleBackend.newSchemaTupleFactory(schema, false, GenContext.FR_JOIN);
}
schema = keySchemas[i];
if (schema != null) {
log.debug("Using SchemaTuple for FR Join key Schema: " + schema);
keySchemaTupleFactories[i] = SchemaTupleBackend.newSchemaTupleFactory(schema, false, GenContext.FR_JOIN);
}
}
int i = -1;
long time1 = System.currentTimeMillis();
for (FileSpec replFile : replFiles) {
++i;
SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[i];
SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
if (i == fragment) {
replicates.set(i, null);
continue;
}
POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
replFile);
Properties props = ConfigurationUtil.getLocalFSProperties();
PigContext pc = new PigContext(ExecType.LOCAL, props);
ld.setPc(pc);
// We use LocalRearrange Operator to seperate Key and Values
// eg. ( a, b, c ) would generate a, ( a, b, c )
// And we use 'a' as the key to the HashMap
// The rest '( a, b, c )' is added to HashMap as value
// We could have manually done this, but LocalRearrange does the
// same thing, so utilizing its functionality
POLocalRearrange lr = LRs[i];
lr.setInputs(Arrays.asList((PhysicalOperator) ld));
Map<Object, ArrayList<Tuple>> replicate;
if (keySchemaTupleFactory == null) {
replicate = new HashMap<Object, ArrayList<Tuple>>(1000);
} else {
replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
}
log.debug("Completed setup. Trying to build replication hash table");
for (Result res = lr.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = lr.getNextTuple()) {
if (getReporter() != null)
getReporter().progress();
Tuple tuple = (Tuple) res.result;
Object key = tuple.get(1);
if (isKeyNull(key)) continue;
Tuple value = getValueTuple(lr, tuple);
ArrayList<Tuple> values = replicate.get(key);
if (values == null) {
if (inputSchemaTupleFactory == null) {
values = new ArrayList<Tuple>(1);
} else {
values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory);
}
replicate.put(key, values);
}
values.add(value);
}
replicates.set(i, replicate);
}
long time2 = System.currentTimeMillis();
log.debug("Hash Table built. Time taken: " + (time2 - time1));
}
protected boolean isKeyNull(Object key) throws ExecException {
if (key == null) return true;
if (key instanceof Tuple) {
Tuple t = (Tuple)key;
for (int i=0; i<t.size(); i++) {
if (t.isNull(i)) return true;
}
}
return false;
}
private void readObject(ObjectInputStream is) throws IOException,
ClassNotFoundException, ExecException {
is.defaultReadObject();
// setUpHashTable();
}
/*
* Extracts the value tuple from the LR operator's output tuple
*/
protected Tuple getValueTuple(POLocalRearrange lr, Tuple tuple)
throws ExecException {
Tuple val = (Tuple) tuple.get(2);
Tuple retTup = null;
boolean isProjectStar = lr.isProjectStar();
Map<Integer, Integer> keyLookup = lr.getProjectedColsMap();
int keyLookupSize = keyLookup.size();
Object key = tuple.get(1);
boolean isKeyTuple = lr.isKeyTuple();
Tuple keyAsTuple = isKeyTuple ? (Tuple) tuple.get(1) : null;
if (keyLookupSize > 0) {
// we have some fields of the "value" in the
// "key".
int finalValueSize = keyLookupSize + val.size();
retTup = mTupleFactory.newTuple(finalValueSize);
int valIndex = 0; // an index for accessing elements from
// the value (val) that we have currently
for (int i = 0; i < finalValueSize; i++) {
Integer keyIndex = keyLookup.get(i);
if (keyIndex == null) {
// the field for this index is not in the
// key - so just take it from the "value"
// we were handed
retTup.set(i, val.get(valIndex));
valIndex++;
} else {
// the field for this index is in the key
if (isKeyTuple) {
// the key is a tuple, extract the
// field out of the tuple
retTup.set(i, keyAsTuple.get(keyIndex));
} else {
retTup.set(i, key);
}
}
}
} else if (isProjectStar) {
// the whole "value" is present in the "key"
retTup = mTupleFactory.newTuple(keyAsTuple.getAll());
} else {
// there is no field of the "value" in the
// "key" - so just make a copy of what we got
// as the "value"
retTup = mTupleFactory.newTuple(val.getAll());
}
return retTup;
}
public List<List<PhysicalPlan>> getJoinPlans() {
return phyPlanLists;
}
public POLocalRearrange[] getLRs() {
return LRs;
}
public boolean isLeftOuterJoin() {
return isLeftOuterJoin;
}
public int getFragment() {
return fragment;
}
public void setFragment(int fragment) {
this.fragment = fragment;
}
public FileSpec[] getReplFiles() {
return replFiles;
}
public void setReplFiles(FileSpec[] replFiles) {
this.replFiles = replFiles;
}
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
// no op: all handled by the preceding POForEach
return null;
}
@Override
public POFRJoin clone() throws CloneNotSupportedException {
POFRJoin clone = (POFRJoin) super.clone();
// Not doing deep copy of nullBag, nullBag, inputSchemas, keySchemas
// as they are read only
clone.phyPlanLists = new ArrayList<List<PhysicalPlan>>(phyPlanLists.size());
for (List<PhysicalPlan> ppLst : phyPlanLists) {
clone.phyPlanLists.add(clonePlans(ppLst));
}
clone.LRs = new POLocalRearrange[phyPlanLists.size()];
clone.constExps = new ConstantExpression[phyPlanLists.size()];
try {
clone.createJoinPlans(getOperatorKey());
} catch (ExecException e) {
CloneNotSupportedException cnse = new CloneNotSupportedException("Problem with setting plans of " + this.getClass().getSimpleName());
cnse.initCause(e);
throw cnse;
}
return clone;
}
}