blob: 4d134aeeef78c656ed1776b6cab4ed08e0969838 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.pig.backend.hadoop.executionengine.tez.plan;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezOperDependencyParallelismEstimator.TezParallelismFactorVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
* An operator model for a Tez job. Acts as a host to the plans that will
* execute in Tez vertices.
public class TezOperator extends Operator<TezOpPlanVisitor> {
private static final long serialVersionUID = 1L;
// Processor pipeline
// Note TezOperator needs to be serialized and de-serialized to
// be used in PigGraceShuffleVertexManager, some fields are either
// big, or not serializable, and not in use in PigGraceShuffleVertexManager,
// mark them as transient: plan, vertexGroupInfo, inputSplitInfo
public transient PhysicalPlan plan;
// Descriptors for out-bound edges.
public Map<OperatorKey, TezEdgeDescriptor> outEdges;
// Descriptors for in-bound edges.
public Map<OperatorKey, TezEdgeDescriptor> inEdges;
public Set<String> UDFs;
public Set<PhysicalOperator> scalars;
// Use AtomicInteger for access by reference and being able to reset in
// TezDAGBuilder based on number of input splits.
// We just need mutability and not concurrency
// This is to ensure that vertexes with 1-1 edge have same parallelism
// even when parallelism of source vertex changes.
// Can change to int and set to -1 if TEZ-800 gets fixed.
private AtomicInteger requestedParallelism = new AtomicInteger(-1);
private int estimatedParallelism = -1;
// Do not estimate parallelism for specific vertices like limit, indexer,
// etc which should always be one
private boolean dontEstimateParallelism = false;
// Override user specified intermediate parallelism for cases
// like skewed join followed by group by + combiner if estimation is higher
// In mapreduce group by + combiner runs in map phase and uses more maps (default 128MB per map)
// while skewed join reducers process more (default 1G per reducer) which makes MRR a disadvantage
private boolean overrideIntermediateParallelism = false;
// This is the parallelism of the vertex, it take account of:
// 1. default_parallel
// 2. -1 parallelism for one_to_one edge
// 3. -1 parallelism for sort/skewed join
private int vertexParallelism = -1;
// TODO: When constructing Tez vertex, we have to specify how much resource
// the vertex will need. So we need to estimate these values while compiling
// physical plan into tez plan. For now, we're using default values - 1G mem
// and 1 core.
//int requestedMemory = 1024;
//int requestedCpu = 1;
// This indicates that this TezOper is a split operator
private boolean splitter;
// This indicates that this TezOper has POSplit as a predecessor.
private OperatorKey splitParent = null;
// Indicates that the plan creation is complete
boolean closed = false;
// Indicate whether we need to split the DAG below the operator
// The result is two or more DAG connected DAG inside the same plan container
boolean segmentBelow = false;
//The sort order of the columns;
//asc is true and desc is false
boolean[] sortOrder;
// Flag to indicate if the small input splits need to be combined to form a larger
// one in order to reduce the number of mappers. For merge join, both tables
// are NOT combinable for correctness.
private boolean combineSmallSplits = true;
// Used by partition vertex, if not null, need to collect sample sent from predecessor
private TezOperator sampleOperator = null;
// Used by sample vertex, send parallelism event to orderOperator
private TezOperator sortOperator = null;
// If the flag is set, FindQuantilesTez/PartitionSkewedKeysTez will use aggregated sample
// to calculate the number of parallelism at runtime, instead of the numQuantiles/totalReducers_
// parameter set statically
private boolean needEstimateParallelism = false;
// If true, we will use secondary key sort in the job
private boolean useSecondaryKey = false;
private List<String> crossKeys = null;
private boolean useMRMapSettings = false;
private boolean useGraceParallelism = false;
private Map<OperatorKey, Double> parallelismFactorPerSuccessor;
private Boolean intermediateReducer = null;
// Types of blocking operators. For now, we only support the following ones.
public static enum OPER_FEATURE {
// Indicate if this job is a merge indexer
// Indicate if this job is a sampling job
// Indicate if this job is a sample aggregation job
// Indicate if this job is a sample based partition job (order by/skewed join)
// Indicate if this job is a global sort
// Indicate if this job is a group by job
// Indicate if this job is a cogroup job
// Indicate if this job is a regular join job
// Indicate if this job is a skewed join job
// Indicate if this job is a limit job
// Indicate if this job is a limit job after sort
// Indicate if this job is a union job
// Indicate if this job is a distinct job
// Indicate if this job is a native job
// Indicate if this job does rank counter
// Features in the job/vertex. Mostly will be only one feature.
// But in some cases can have more than one.
// For eg: a vertex can be both GLOBAL SORT and LIMIT if parallelism is 1
BitSet feature = new BitSet();
private List<OperatorKey> vertexGroupMembers;
// For union
private transient VertexGroupInfo vertexGroupInfo;
// Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
private Map<OperatorKey, OperatorKey> vertexGroupStores = null;
private boolean isVertexGroup = false;
public static class LoaderInfo implements Serializable {
private List<POLoad> loads = null;
private ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
private ArrayList<String> inpSignatureLists = new ArrayList<String>();
private ArrayList<Long> inpLimits = new ArrayList<Long>();
private transient InputSplitInfo inputSplitInfo = null;
public List<POLoad> getLoads() {
return loads;
public void setLoads(List<POLoad> loads) {
this.loads = loads;
public ArrayList<FileSpec> getInp() {
return inp;
public void setInp(ArrayList<FileSpec> inp) {
this.inp = inp;
public ArrayList<String> getInpSignatureLists() {
return inpSignatureLists;
public void setInpSignatureLists(ArrayList<String> inpSignatureLists) {
this.inpSignatureLists = inpSignatureLists;
public ArrayList<Long> getInpLimits() {
return inpLimits;
public void setInpLimits(ArrayList<Long> inpLimits) {
this.inpLimits = inpLimits;
public InputSplitInfo getInputSplitInfo() {
return inputSplitInfo;
public void setInputSplitInfo(InputSplitInfo inputSplitInfo) {
this.inputSplitInfo = inputSplitInfo;
private LoaderInfo loaderInfo = new LoaderInfo();
private long totalInputFilesSize = -1;
public TezOperator(OperatorKey k) {
plan = new PhysicalPlan();
outEdges = Maps.newHashMap();
inEdges = Maps.newHashMap();
UDFs = Sets.newHashSet();
scalars = Sets.newHashSet();
public String getProcessorName() {
return PigProcessor.class.getName();
public void visit(TezOpPlanVisitor v) throws VisitorException {
public boolean supportsMultipleInputs() {
return true;
public boolean supportsMultipleOutputs() {
return true;
public int getRequestedParallelism() {
return requestedParallelism.get();
public void setRequestedParallelism(int requestedParallelism) {
public void setRequestedParallelismByReference(TezOperator oper) {
this.requestedParallelism = oper.requestedParallelism;
public int getEstimatedParallelism() {
return estimatedParallelism;
public void setEstimatedParallelism(int estimatedParallelism) {
this.estimatedParallelism = estimatedParallelism;
public int getEffectiveParallelism(int defaultParallelism) {
// PIG-4162: For intermediate reducers, use estimated parallelism over user set parallelism.
return getEstimatedParallelism() == -1
? (getRequestedParallelism() == -1 ? defaultParallelism : getRequestedParallelism())
: getEstimatedParallelism();
public boolean isDontEstimateParallelism() {
return dontEstimateParallelism;
public void setDontEstimateParallelism(boolean dontEstimateParallelism) {
this.dontEstimateParallelism = dontEstimateParallelism;
public boolean isOverrideIntermediateParallelism() {
return overrideIntermediateParallelism;
public void setOverrideIntermediateParallelism(
boolean overrideIntermediateParallelism) {
this.overrideIntermediateParallelism = overrideIntermediateParallelism;
public OperatorKey getSplitParent() {
return splitParent;
public void setSplitParent(OperatorKey splitParent) {
this.splitParent = splitParent;
public void setSplitter(boolean spl) {
splitter = spl;
public boolean isSplitter() {
return splitter;
public boolean isClosed() {
return closed;
public void setClosed(boolean closed) {
this.closed = closed;
public boolean isIndexer() {
return feature.get(OPER_FEATURE.INDEXER.ordinal());
public void markIndexer() {
public boolean isSampler() {
return feature.get(OPER_FEATURE.SAMPLER.ordinal());
public void markSampler() {
public boolean isSampleAggregation() {
return feature.get(OPER_FEATURE.SAMPLE_AGGREGATOR.ordinal());
public void markSampleAggregation() {
public boolean isSampleBasedPartitioner() {
return feature.get(OPER_FEATURE.SAMPLE_BASED_PARTITIONER.ordinal());
public void markSampleBasedPartitioner() {
public boolean isGlobalSort() {
return feature.get(OPER_FEATURE.GLOBAL_SORT.ordinal());
public void markGlobalSort() {
public boolean isGroupBy() {
return feature.get(OPER_FEATURE.GROUPBY.ordinal());
public void markGroupBy() {
public boolean isCogroup() {
return feature.get(OPER_FEATURE.COGROUP.ordinal());
public void markCogroup() {
public boolean isRegularJoin() {
return feature.get(OPER_FEATURE.HASHJOIN.ordinal());
public void markRegularJoin() {
public boolean isSkewedJoin() {
return feature.get(OPER_FEATURE.SKEWEDJOIN.ordinal());
public void markSkewedJoin() {
public boolean isLimit() {
return feature.get(OPER_FEATURE.LIMIT.ordinal());
public void markLimit() {
public boolean isLimitAfterSort() {
return feature.get(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal());
public void markLimitAfterSort() {
public boolean isUnion() {
return feature.get(OPER_FEATURE.UNION.ordinal());
public void markUnion() {
public boolean isDistinct() {
return feature.get(OPER_FEATURE.DISTINCT.ordinal());
public void markDistinct() {
public boolean isNative() {
return feature.get(OPER_FEATURE.NATIVE.ordinal());
public void markNative() {
public boolean isRankCounter() {
return feature.get(OPER_FEATURE.RANK_COUNTER.ordinal());
public void markRankCounter() {
public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
for (OPER_FEATURE opf : OPER_FEATURE.values()) {
if (excludeFeatures != null && excludeFeatures.contains(opf)) {
if (copyFrom.feature.get(opf.ordinal())) {
public void setNeedEstimatedQuantile(boolean needEstimateParallelism) {
this.needEstimateParallelism = needEstimateParallelism;
public boolean isNeedEstimateParallelism() {
return needEstimateParallelism;
public boolean isUseSecondaryKey() {
return useSecondaryKey;
public void setUseSecondaryKey(boolean useSecondaryKey) {
this.useSecondaryKey = useSecondaryKey;
public List<OperatorKey> getUnionMembers() {
return vertexGroupMembers;
public List<OperatorKey> getVertexGroupMembers() {
return vertexGroupMembers;
public void addUnionPredecessor(OperatorKey unionPredecessor) {
if (vertexGroupMembers == null) {
vertexGroupMembers = new ArrayList<OperatorKey>();
public void setVertexGroupMembers(List<OperatorKey> vertexGroupMembers) {
this.vertexGroupMembers = vertexGroupMembers;
// Union is the only operator that uses alias vertex (VertexGroup) now. But
// more operators could be added to the list in the future.
public boolean isVertexGroup() {
return isVertexGroup;
public VertexGroupInfo getVertexGroupInfo() {
return vertexGroupInfo;
public void setVertexGroupInfo(VertexGroupInfo vertexGroup) {
this.vertexGroupInfo = vertexGroup;
this.isVertexGroup = true;
public void addVertexGroupStore(OperatorKey storeKey, OperatorKey vertexGroupKey) {
if (this.vertexGroupStores == null) {
this.vertexGroupStores = new HashMap<OperatorKey, OperatorKey>();
this.vertexGroupStores.put(storeKey, vertexGroupKey);
public void removeVertexGroupStore(OperatorKey vertexGroupKey) {
Iterator<Entry<OperatorKey, OperatorKey>> iter = vertexGroupStores.entrySet().iterator();
while (iter.hasNext()) {
Entry<OperatorKey, OperatorKey> entry =;
if (entry.getValue().equals(vertexGroupKey)) {
public Map<OperatorKey, OperatorKey> getVertexGroupStores() {
return this.vertexGroupStores;
public String name() {
String udfStr = getUDFsAsStr();
StringBuilder sb = new StringBuilder("Tez" + "(" + requestedParallelism +
(udfStr.equals("")? "" : ",") + udfStr + ")" + " - " + mKey.toString());
return sb.toString();
public String toString() {
StringBuilder sb = new StringBuilder(name() + ":\n");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
if (plan!=null && !plan.isEmpty()) {
String mp = new String(baos.toByteArray());
sb.append(shiftStringByTabs(mp, "| "));
} else {
sb.append("Plan Empty");
return sb.toString();
private String getUDFsAsStr() {
StringBuilder sb = new StringBuilder();
if(UDFs!=null && UDFs.size()>0){
for (String str : UDFs) {
return sb.toString();
private String shiftStringByTabs(String DFStr, String tab) {
StringBuilder sb = new StringBuilder();
String[] spl = DFStr.split("\n");
for (int i = 0; i < spl.length; i++) {
sb.delete(sb.length() - "\n".length(), sb.length());
return sb.toString();
public boolean needSegmentBelow() {
return segmentBelow;
public void setSortOrder(boolean[] sortOrder) {
if(null == sortOrder) return;
this.sortOrder = new boolean[sortOrder.length];
for(int i = 0; i < sortOrder.length; ++i) {
this.sortOrder[i] = sortOrder[i];
public boolean[] getSortOrder() {
return sortOrder;
public TezOperator getSampleOperator() {
return sampleOperator;
public void setSampleOperator(TezOperator sampleOperator) {
this.sampleOperator = sampleOperator;
public TezOperator getSortOperator() {
return sortOperator;
public void setSortOperator(TezOperator sortOperator) {
this.sortOperator = sortOperator;
protected void noCombineSmallSplits() {
combineSmallSplits = false;
public boolean combineSmallSplits() {
return combineSmallSplits;
public void addCrossKey(String key) {
if (crossKeys == null) {
crossKeys = new ArrayList<String>();
public List<String> getCrossKeys() {
return crossKeys;
public boolean isUseMRMapSettings() {
return useMRMapSettings;
public void setUseMRMapSettings(boolean useMRMapSettings) {
this.useMRMapSettings = useMRMapSettings;
public int getVertexParallelism() {
return vertexParallelism;
public void setVertexParallelism(int vertexParallelism) {
this.vertexParallelism = vertexParallelism;
public LoaderInfo getLoaderInfo() {
return loaderInfo;
public long getTotalInputFilesSize() {
return totalInputFilesSize;
public void setTotalInputFilesSize(long totalInputFilesSize) {
this.totalInputFilesSize = totalInputFilesSize;
public void setUseGraceParallelism(boolean useGraceParallelism) {
this.useGraceParallelism = useGraceParallelism;
public boolean isUseGraceParallelism() {
return useGraceParallelism;
public double getParallelismFactor(TezOperator successor) throws VisitorException {
if (parallelismFactorPerSuccessor == null) {
parallelismFactorPerSuccessor = new HashMap<OperatorKey, Double>();
Double factor = parallelismFactorPerSuccessor.get(successor.getOperatorKey());
if (factor == null) {
// We determine different parallelism factors for different successors (edges).
// For eg: If we have two successors, one with combine plan and other without
// we want to compute lesser parallelism factor for the one with the combine plan
// as that edge will get less data.
// TODO: To be more perfect, we need only look at the split sub-plan that
// writes to that successor edge. If there is a FILTER in one sub-plan it is accounted
// for all the successors now which is not right.
TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(this, successor);
factor = parallelismFactorVisitor.getFactor();
parallelismFactorPerSuccessor.put(successor.getOperatorKey(), factor);
return factor;
public Boolean isIntermediateReducer() throws IOException {
if (intermediateReducer == null) {
intermediateReducer = false;
// set intermediateReducer to true if are no loads or stores in a TezOperator
LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(plan, POStore.class);
// Not map and not final reducer
if (stores.size() <= 0 &&
(getLoaderInfo().getLoads() == null || getLoaderInfo().getLoads().size() <= 0)) {
intermediateReducer = true;
return intermediateReducer;
public void setIntermediateReducer(Boolean intermediateReducer) {
this.intermediateReducer = intermediateReducer;
public static class VertexGroupInfo {
private List<OperatorKey> inputKeys;
private String outputKey;
private POStore store;
private OutputDescriptor storeOutDescriptor;
private VertexGroup vertexGroup;
private FileSpec sFile;
public VertexGroupInfo() {
public VertexGroupInfo(POStore store) { = store;
public List<OperatorKey> getInputs() {
return inputKeys;
public void addInput(OperatorKey input) {
if (inputKeys == null) {
inputKeys = new ArrayList<OperatorKey>();
public boolean removeInput(OperatorKey input) {
return this.inputKeys.remove(input);
public String getOutput() {
return outputKey;
public void setOutput(String output) {
this.outputKey = output;
public POStore getStore() {
return store;
public OutputDescriptor getStoreOutputDescriptor() {
return storeOutDescriptor;
public void setStoreOutputDescriptor(OutputDescriptor storeOutDescriptor) {
this.storeOutDescriptor = storeOutDescriptor;
public VertexGroup getVertexGroup() {
return vertexGroup;
public void setVertexGroup(VertexGroup vertexGroup) {
this.vertexGroup = vertexGroup;
public void setSFile(FileSpec sFile) {
this.sFile = sFile;
public FileSpec getSFile() {
return sFile;