blob: b0a714f6216f70a8345cfd8f50af35d8ee605c1c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
/**
* An operator model for a Map Reduce job.
* Acts as a host to the plans that will
* execute in map, reduce and optionally combine
* phases. These will be embedded in the MROperPlan
* in order to capture the dependencies amongst jobs.
*/
public class MapReduceOper extends Operator<MROpPlanVisitor> {
private static final long serialVersionUID = 1L;
//The physical plan that should be executed
//in the map phase
public PhysicalPlan mapPlan;
//The physical plan that should be executed
//in the reduce phase
public PhysicalPlan reducePlan;
//The physical plan that should be executed
//in the combine phase if one exists. Will be used
//by the optimizer.
public PhysicalPlan combinePlan;
// key for the map plan
// this is needed when the key is null to create
// an appropriate NullableXXXWritable object
public byte mapKeyType;
//record the map key types of all splittees
public byte[] mapKeyTypeOfSplittees;
//Indicates that the map plan creation
//is complete
boolean mapDone = false;
//Indicates that the reduce plan creation
//is complete
boolean reduceDone = false;
// Indicates that there is an operator which uses endOfAllInput flag in the
// map plan
boolean endOfAllInputInMap = false;
// Indicates that there is an operator which uses endOfAllInput flag in the
// reduce plan
boolean endOfAllInputInReduce = false;;
//Indicates if this job is an order by job
boolean globalSort = false;
// Indicates if this is a limit after a sort
boolean limitAfterSort = false;
// Indicate if the entire purpose for this map reduce job is doing limit, does not change
// anything else. This is to help POPackageAnnotator to find the right POPackage to annotate
boolean limitOnly = false;
OPER_FEATURE feature = OPER_FEATURE.NONE;
// If true, putting an identity combine in this
// mapreduce job will speed things up.
boolean needsDistinctCombiner = false;
// If true, we will use secondary key in the map-reduce job
boolean useSecondaryKey = false;
//The quantiles file name if globalSort is true
String quantFile;
//The sort order of the columns;
//asc is true and desc is false
boolean[] sortOrder;
// Sort order for secondary keys;
boolean[] secondarySortOrder;
public Set<String> UDFs;
public Set<PhysicalOperator> scalars;
// Indicates if a UDF comparator is used
boolean isUDFComparatorUsed = false;
transient NodeIdGenerator nig;
private String scope;
int requestedParallelism = -1;
// estimated at runtime
int estimatedParallelism = -1;
// calculated at runtime
int runtimeParallelism = -1;
/* Name of the Custom Partitioner used */
String customPartitioner = null;
// Last POLimit value in this map reduce operator, needed by LimitAdjuster
// to add additional map reduce operator with 1 reducer after this
long limit = -1;
// POLimit can also have an expression. See PIG-1926
PhysicalPlan limitPlan = null;
// Indicates that this MROper is a splitter MROper.
// That is, this MROper ends due to a POSPlit operator.
private boolean splitter = false;
// Set to true if it is skewed join
private boolean skewedJoin = false;
// Name of the partition file generated by sampling process,
// Used by Skewed Join
private String skewedJoinPartitionFile;
// Flag to communicate from MRCompiler to JobControlCompiler what kind of
// comparator is used by Hadoop for sorting for this MROper.
// By default, set to false which will make Pig provide raw comparators.
// Set to true in indexing job generated in map-side cogroup, merge join.
private boolean usingTypedComparator = false;
// Flag to indicate if the small input splits need to be combined to form a larger
// one in order to reduce the number of mappers. For merge join, both tables
// are NOT combinable for correctness.
private boolean combineSmallSplits = true;
// Map of the physical operator in physical plan to the one in MR plan: only needed
// if the physical operator is changed/replaced in MR compilation due to, e.g., optimization
public MultiMap<PhysicalOperator, PhysicalOperator> phyToMRMap;
private static enum OPER_FEATURE {
NONE,
// Indicate if this job is a sampling job
SAMPLER,
// Indicate if this job is a merge indexer
INDEXER,
// Indicate if this job is a group by job
GROUPBY,
// Indicate if this job is a cogroup job
COGROUP,
// Indicate if this job is a regular join job
HASHJOIN;
};
public MapReduceOper(OperatorKey k) {
super(k);
mapPlan = new PhysicalPlan();
combinePlan = new PhysicalPlan();
reducePlan = new PhysicalPlan();
UDFs = new HashSet<String>();
scalars = new HashSet<PhysicalOperator>();
nig = NodeIdGenerator.getGenerator();
scope = k.getScope();
phyToMRMap = new MultiMap<PhysicalOperator, PhysicalOperator>();
}
/*@Override
public String name() {
return "MapReduce - " + mKey.toString();
}*/
private String shiftStringByTabs(String DFStr, String tab) {
StringBuilder sb = new StringBuilder();
String[] spl = DFStr.split("\n");
for (int i = 0; i < spl.length; i++) {
sb.append(tab);
sb.append(spl[i]);
sb.append("\n");
}
sb.delete(sb.length() - "\n".length(), sb.length());
return sb.toString();
}
/**
* Uses the string representation of the
* component plans to identify itself.
*/
@Override
public String name() {
String udfStr = getUDFsAsStr();
StringBuilder sb = new StringBuilder("MapReduce" + "(" + requestedParallelism +
(udfStr.equals("")? "" : ",") + udfStr + ")" + " - " + mKey.toString()
+ ":\n");
int index = sb.length();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
if(!mapPlan.isEmpty()){
mapPlan.explain(baos);
String mp = new String(baos.toByteArray());
sb.append(shiftStringByTabs(mp, "| "));
}
else
sb.append("Map Plan Empty");
if (!reducePlan.isEmpty()){
baos.reset();
reducePlan.explain(baos);
String rp = new String(baos.toByteArray());
sb.insert(index, shiftStringByTabs(rp, "| ") + "\n");
}
else
sb.insert(index, "Reduce Plan Empty" + "\n");
return sb.toString();
}
private String getUDFsAsStr() {
StringBuilder sb = new StringBuilder();
if(UDFs!=null && UDFs.size()>0){
for (String str : UDFs) {
sb.append(str.substring(str.lastIndexOf('.')+1));
sb.append(',');
}
sb.deleteCharAt(sb.length()-1);
}
return sb.toString();
}
@Override
public boolean supportsMultipleInputs() {
return true;
}
@Override
public boolean supportsMultipleOutputs() {
return true;
}
@Override
public void visit(MROpPlanVisitor v) throws VisitorException {
v.visitMROp(this);
}
public boolean isMapDone() {
return mapDone;
}
public void setMapDone(boolean mapDone){
this.mapDone = mapDone;
}
public void setMapDoneSingle(boolean mapDone) throws PlanException{
this.mapDone = mapDone;
if (mapDone && mapPlan.getLeaves().size()>1) {
mapPlan.addAsLeaf(getUnion());
}
}
public void setMapDoneMultiple(boolean mapDone) throws PlanException{
this.mapDone = mapDone;
if (mapDone && mapPlan.getLeaves().size()>0) {
mapPlan.addAsLeaf(getUnion());
}
}
private POUnion getUnion(){
return new POUnion(new OperatorKey(scope,nig.getNextNodeId(scope)));
}
public boolean isReduceDone() {
return reduceDone;
}
public void setReduceDone(boolean reduceDone){
this.reduceDone = reduceDone;
}
public boolean isGlobalSort() {
return globalSort;
}
public boolean isSkewedJoin() {
return (skewedJoinPartitionFile != null);
}
public void setSkewedJoinPartitionFile(String file) {
skewedJoinPartitionFile = file;
}
public String getSkewedJoinPartitionFile() {
return skewedJoinPartitionFile;
}
public void setSkewedJoin(boolean skJoin) {
this.skewedJoin = skJoin;
}
public boolean getSkewedJoin() {
return skewedJoin;
}
public void setGlobalSort(boolean globalSort) {
this.globalSort = globalSort;
}
public boolean isLimitAfterSort() {
return limitAfterSort;
}
public void setLimitAfterSort(boolean las) {
limitAfterSort = las;
}
public boolean isLimitOnly() {
return limitOnly;
}
public void setLimitOnly(boolean limitOnly) {
this.limitOnly = limitOnly;
}
public boolean isIndexer() {
return (feature == OPER_FEATURE.INDEXER);
}
public void markIndexer() {
feature = OPER_FEATURE.INDEXER;
}
public boolean isSampler() {
return (feature == OPER_FEATURE.SAMPLER);
}
public void markSampler() {
feature = OPER_FEATURE.SAMPLER;
}
public boolean isGroupBy() {
return (feature == OPER_FEATURE.GROUPBY);
}
public void markGroupBy() {
feature = OPER_FEATURE.GROUPBY;
}
public boolean isCogroup() {
return (feature == OPER_FEATURE.COGROUP);
}
public void markCogroup() {
feature = OPER_FEATURE.COGROUP;
}
public boolean isRegularJoin() {
return (feature == OPER_FEATURE.HASHJOIN);
}
public void markRegularJoin() {
feature = OPER_FEATURE.HASHJOIN;
}
public boolean needsDistinctCombiner() {
return needsDistinctCombiner;
}
public void setNeedsDistinctCombiner(boolean nic) {
needsDistinctCombiner = nic;
}
public String getQuantFile() {
return quantFile;
}
public void setQuantFile(String quantFile) {
this.quantFile = quantFile;
}
public void setSortOrder(boolean[] sortOrder) {
if(null == sortOrder) return;
this.sortOrder = new boolean[sortOrder.length];
for(int i = 0; i < sortOrder.length; ++i) {
this.sortOrder[i] = sortOrder[i];
}
}
public void setSecondarySortOrder(boolean[] secondarySortOrder) {
if(null == secondarySortOrder) return;
this.secondarySortOrder = new boolean[secondarySortOrder.length];
for(int i = 0; i < secondarySortOrder.length; ++i) {
this.secondarySortOrder[i] = secondarySortOrder[i];
}
}
public boolean[] getSortOrder() {
return sortOrder;
}
public boolean[] getSecondarySortOrder() {
return secondarySortOrder;
}
/**
* @return whether end of all input is set in the map plan
*/
public boolean isEndOfAllInputSetInMap() {
return endOfAllInputInMap;
}
/**
* @param endOfAllInputInMap the streamInMap to set
*/
public void setEndOfAllInputInMap(boolean endOfAllInputInMap) {
this.endOfAllInputInMap = endOfAllInputInMap;
}
/**
* @return whether end of all input is set in the reduce plan
*/
public boolean isEndOfAllInputSetInReduce() {
return endOfAllInputInReduce;
}
/**
* @param endOfAllInputInReduce the streamInReduce to set
*/
public void setEndOfAllInputInReduce(boolean endOfAllInputInReduce) {
this.endOfAllInputInReduce = endOfAllInputInReduce;
}
public int getRequestedParallelism() {
return requestedParallelism;
}
public String getCustomPartitioner() {
return customPartitioner;
}
public void setSplitter(boolean spl) {
splitter = spl;
}
public boolean isSplitter() {
return splitter;
}
public boolean getUseSecondaryKey() {
return useSecondaryKey;
}
public void setUseSecondaryKey(boolean useSecondaryKey) {
this.useSecondaryKey = useSecondaryKey;
}
protected boolean usingTypedComparator() {
return usingTypedComparator;
}
protected void useTypedComparator(boolean useTypedComparator) {
this.usingTypedComparator = useTypedComparator;
}
protected void noCombineSmallSplits() {
combineSmallSplits = false;
}
public boolean combineSmallSplits() {
return combineSmallSplits;
}
public boolean isRankOperation() {
return getRankOperationId().size() != 0;
}
public ArrayList<String> getRankOperationId() {
ArrayList<String> operationIDs = new ArrayList<String>();
Iterator<PhysicalOperator> mapRoots = this.mapPlan.getRoots().iterator();
while(mapRoots.hasNext()) {
PhysicalOperator operation = mapRoots.next();
if(operation instanceof PORank)
operationIDs.add(((PORank) operation).getOperationID());
}
return operationIDs;
}
public boolean isCounterOperation() {
return (getCounterOperation() != null);
}
public boolean isRowNumber() {
POCounter counter = getCounterOperation();
return (counter != null)?counter.isRowNumber():false;
}
public String getOperationID() {
POCounter counter = getCounterOperation();
return (counter != null)?counter.getOperationID():null;
}
private POCounter getCounterOperation() {
POCounter counter = getCounterOperation(this.mapPlan);
if (counter == null) {
counter = getCounterOperation(this.reducePlan);
}
return counter;
}
private POCounter getCounterOperation(PhysicalPlan plan) {
PhysicalOperator operator;
Iterator<PhysicalOperator> it = plan.getLeaves().iterator();
while (it.hasNext()) {
operator = it.next();
if (operator instanceof POCounter) {
return (POCounter) operator;
} else if (operator instanceof POStore) {
List<PhysicalOperator> preds = plan.getPredecessors(operator);
if (preds != null) {
for (PhysicalOperator pred : preds) {
if (pred instanceof POCounter) {
return (POCounter) pred;
}
}
}
}
}
return null;
}
}