blob: 6ea5a8b09d49ea688760962d0bf00f5d99988c14 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigConfiguration;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator.OPER_FEATURE;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator.VertexGroupInfo;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
import org.apache.pig.builtin.AvroStorage;
import org.apache.pig.builtin.JsonStorage;
import org.apache.pig.builtin.OrcStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
/**
* Optimizes union by removing the intermediate union vertex and making the
* successor get input from the predecessor vertices directly using VertexGroup.
* This should be run after MultiQueryOptimizer so that it handles cases like
* union followed by split and then store.
*
* For eg:
* 1) Union followed by store
* Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + Store) will be optimized to
* Vertex 1 (Load + Store), Vertex 2 (Load + Store). Both the vertices will be writing output
* to same store location directly which is supported by Tez.
* 2) Union followed by groupby
* Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + POLocalRearrange) -> Vertex 4 (Group by)
* will be optimized to Vertex 1 (Load + POLR), Vertex 2 (Load + POLR) -> Vertex 4 (Group by)
*
*/
public class UnionOptimizer extends TezOpPlanVisitor {
private static final Log LOG = LogFactory.getLog(UnionOptimizer.class);
private TezOperPlan tezPlan;
private static Set<String> builtinSupportedStoreFuncs = new HashSet<String>();
private List<String> supportedStoreFuncs;
private List<String> unsupportedStoreFuncs;
static {
builtinSupportedStoreFuncs.add(PigStorage.class.getName());
builtinSupportedStoreFuncs.add(JsonStorage.class.getName());
builtinSupportedStoreFuncs.add(OrcStorage.class.getName());
builtinSupportedStoreFuncs.add(HBaseStorage.class.getName());
builtinSupportedStoreFuncs.add(AvroStorage.class.getName());
builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.AvroStorage");
builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.CSVExcelStorage");
builtinSupportedStoreFuncs.add(Storage.class.getName());
}
public UnionOptimizer(TezOperPlan plan, List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs) {
super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
tezPlan = plan;
this.supportedStoreFuncs = supportedStoreFuncs;
this.unsupportedStoreFuncs = unsupportedStoreFuncs;
}
public static boolean isOptimizable(TezOperator tezOp) throws VisitorException {
if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
return false;
}
// If user has specified a PARALLEL clause with the union operator
// turn off union optimization
if (tezOp.getRequestedParallelism() != -1) {
return false;
}
// Two vertices separately ranking with 1 to n and writing to output directly
// will make each rank repeate twice which is wrong. Rank always needs to be
// done from single vertex to have the counting correct.
if (tezOp.isRankCounter()) {
return false;
}
return true;
}
public static boolean isOptimizableStoreFunc(TezOperator tezOp,
List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs)
throws VisitorException {
List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
for (POStoreTez store : stores) {
String name = store.getStoreFunc().getClass().getName();
if (store.getStoreFunc() instanceof StoreFunc) {
StoreFunc func = (StoreFunc) store.getStoreFunc();
if (func.supportsParallelWriteToStoreLocation() != null) {
if (func.supportsParallelWriteToStoreLocation()) {
continue;
} else {
LOG.warn(name + " does not support union optimization."
+ " Disabling it. There will be some performance degradation.");
return false;
}
}
}
// If StoreFunc does not explicitly state support, then check supported and
// unsupported config settings.
if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
if (unsupportedStoreFuncs != null
&& unsupportedStoreFuncs.contains(name)) {
return false;
}
if (supportedStoreFuncs != null
&& !supportedStoreFuncs.contains(name)) {
if (!builtinSupportedStoreFuncs.contains(name)) {
LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+ " does not contain " + name
+ " and so disabling union optimization. There will be some performance degradation. "
+ "If your storefunc does not hardcode part file names and can work with multiple vertices writing to the output location,"
+ " run pig with -D"
+ PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+ "=<Comma separated list of fully qualified StoreFunc class names> to enable the optimization. Refer PIG-4691");
return false;
}
}
}
}
return true;
}
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
if (!tezOp.isUnion()) {
return;
}
if (!isOptimizable(tezOp)) {
return;
}
TezOperator unionOp = tezOp;
String scope = unionOp.getOperatorKey().scope;
PhysicalPlan unionOpPlan = unionOp.plan;
Set<OperatorKey> uniqueUnionMembers = new HashSet<OperatorKey>(unionOp.getUnionMembers());
List<TezOperator> predecessors = new ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null ? null
: new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
if (uniqueUnionMembers.size() != 1) {
if (!isOptimizableStoreFunc(tezOp, supportedStoreFuncs, unsupportedStoreFuncs)) {
return;
}
if (successors != null) {
for (TezOperator succ : successors) {
for (TezOperator pred : predecessors) {
if (succ.inEdges.containsKey(pred.getOperatorKey())) {
// Stop here, we cannot convert the node into vertex group
// Otherwise, we will end up with a parallel edge between pred
// and succ
return;
}
}
}
}
// TODO: PIG-3856 Handle replicated join and skewed join sample.
// Replicate join small table/skewed join sample that was broadcast to union vertex
// now needs to be broadcast to all the union predecessors. How do we do that??
// Wait for shared edge and do it or write multiple times??
// For now don't optimize except in the case of Split where we need to write only once
if (predecessors.size() > unionOp.getUnionMembers().size()) {
return;
}
}
if (uniqueUnionMembers.size() == 1) {
// We actually don't need VertexGroup in this case. The multiple
// sub-plans of Split can write to same MROutput or the Tez LogicalOutput
OperatorKey splitPredKey = uniqueUnionMembers.iterator().next();
TezOperator splitPredOp = tezPlan.getOperator(splitPredKey);
PhysicalPlan splitPredPlan = splitPredOp.plan;
if (splitPredPlan.getLeaves().get(0) instanceof POSplit) { //It has to be. But check anyways
for( TezOperator op : predecessors ) {
if( !op.getOperatorKey().equals(splitPredKey)) {
Set<TezOperator> allNonMemberPredecessorsAncestors = new HashSet<TezOperator>();
TezCompilerUtil.addAllPredecessors(tezPlan, op, allNonMemberPredecessorsAncestors);
// If any of the nonMemberPredecessor's ancestors(recursive predecessor)
// is from the single unionmember, then we stop the merge effort to avoid creating
// an illegal loop.
if( allNonMemberPredecessorsAncestors.contains(splitPredOp) ) {
return;
}
}
}
try {
connectUnionNonMemberPredecessorsToSplit(unionOp, splitPredOp, predecessors);
// Remove POShuffledValueInputTez from union plan root
unionOpPlan.remove(unionOpPlan.getRoots().get(0));
// Clone union plan into split subplans
for (int i=0; i < Collections.frequency(unionOp.getUnionMembers(), splitPredKey); i++ ) {
cloneAndMergeUnionPlan(unionOp, splitPredOp);
}
copyOperatorProperties(splitPredOp, unionOp);
tezPlan.disconnect(splitPredOp, unionOp);
connectSplitOpToUnionSuccessors(unionOp, splitPredOp, successors);
} catch (PlanException e) {
throw new VisitorException(e);
}
//Remove union operator from the plan
tezPlan.remove(unionOp);
return;
} else {
throw new VisitorException("Expected POSplit but found " + splitPredPlan.getLeaves().get(0));
}
}
// Create vertex group operator for each store. Union followed by Split
// followed by Store could have multiple stores
List<POStoreTez> unionStoreOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
TezOperator[] storeVertexGroupOps = new TezOperator[unionStoreOutputs.size()];
for (int i=0; i < storeVertexGroupOps.length; i++) {
TezOperator existingVertexGroup = null;
if (successors != null) {
for (TezOperator succ : successors) {
if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) {
existingVertexGroup = succ;
break;
}
}
}
if (existingVertexGroup == null) {
// In the case of union + split + union + store, the different stores in the Split
// will be writing to same location after second union operator is optimized.
// So while optimizing the first union, we should just make it write to one vertex group
for (int j = 0; j < i; j++) {
if (unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile())) {
storeVertexGroupOps[i] = storeVertexGroupOps[j];
break;
}
}
if (storeVertexGroupOps[i] != null) {
continue;
}
storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i)));
storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
storeVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers()));
tezPlan.add(storeVertexGroupOps[i]);
} else {
storeVertexGroupOps[i] = existingVertexGroup;
existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
}
}
// Create vertex group operator for each output. Case of split, orderby,
// skewed join, rank, etc will have multiple outputs
List<TezOutput> unionOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, TezOutput.class);
// One TezOutput can write to multiple LogicalOutputs (POCounterTez, POValueOutputTez, etc)
List<String> unionOutputKeys = new ArrayList<String>();
for (TezOutput output : unionOutputs) {
if (output instanceof POStoreTez) {
continue;
}
for (String key : output.getTezOutputs()) {
unionOutputKeys.add(key);
}
}
TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()];
String[] newOutputKeys = new String[unionOutputKeys.size()];
for (int i=0; i < outputVertexGroupOps.length; i++) {
TezOperator existingVertexGroup = null;
if (successors != null) {
for (TezOperator succ : successors) {
if (succ.isVertexGroup()
&& unionOutputKeys.get(i).equals(succ.getVertexGroupInfo().getOutput()) ) {
existingVertexGroup = succ;
break;
}
}
}
if (existingVertexGroup == null) {
for (int j = 0; j < i; j++) {
if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) {
outputVertexGroupOps[i] = outputVertexGroupOps[j];
break;
}
}
if (outputVertexGroupOps[i] != null) {
continue;
}
outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
outputVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers()));
} else {
outputVertexGroupOps[i] = existingVertexGroup;
existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
}
newOutputKeys[i] = outputVertexGroupOps[i].getOperatorKey().toString();
tezPlan.add(outputVertexGroupOps[i]);
}
// Change plan from Predecessors -> Union -> Successor(s) to
// Predecessors -> Vertex Group(s) -> Successor(s)
try {
// Remove POShuffledValueInputTez from union plan root
unionOpPlan.remove(unionOpPlan.getRoots().get(0));
for (OperatorKey predKey : unionOp.getUnionMembers()) {
TezOperator pred = tezPlan.getOperator(predKey);
PhysicalPlan clonePlan = cloneAndMergeUnionPlan(unionOp, pred);
connectPredecessorsToVertexGroups(unionOp, pred, clonePlan,
storeVertexGroupOps, outputVertexGroupOps);
}
connectVertexGroupsToSuccessors(unionOp, successors,
unionOutputKeys, outputVertexGroupOps);
replaceSuccessorInputsAndDisconnect(unionOp, successors, unionOutputKeys, newOutputKeys);
//Remove union operator from the plan
tezPlan.remove(unionOp);
} catch (VisitorException e) {
throw e;
} catch (Exception e) {
throw new VisitorException(e);
}
}
/**
* Connect the predecessors of the union which are not members of the union
* (usually FRJoin replicated table orSkewedJoin sample) to the Split op
* which is the only member of the union. Disconnect those predecessors from the union.
*
* Replace the output keys of those predecessors with the split operator
* key instead of the union operator key.
*
* @param unionOp Union operator
* @param splitPredOp Split operator which is the only member of the union and its predecessor
* @param unionPredecessors Predecessors of the union including the split operator
* @throws PlanException
* @throws VisitorException
*/
private void connectUnionNonMemberPredecessorsToSplit(TezOperator unionOp,
TezOperator splitPredOp,
List<TezOperator> unionPredecessors) throws PlanException, VisitorException {
String unionOpKey = unionOp.getOperatorKey().toString();
OperatorKey splitPredKey = splitPredOp.getOperatorKey();
for (TezOperator pred : unionPredecessors) {
if (!pred.getOperatorKey().equals(splitPredKey)) { //Skip splitPredOp which is also a predecessor
// Get actual predecessors if predecessor is a vertex group
TezOperator predVertexGroup = null;
List<TezOperator> actualPreds = new ArrayList<TezOperator>();
if (pred.isVertexGroup()) {
predVertexGroup = pred;
for (OperatorKey opKey : pred.getVertexGroupMembers()) {
// There should not be multiple levels of vertex group. So no recursion required.
actualPreds.add(tezPlan.getOperator(opKey));
}
tezPlan.disconnect(predVertexGroup, unionOp);
tezPlan.connect(predVertexGroup, splitPredOp);
} else {
actualPreds.add(pred);
}
for (TezOperator actualPred : actualPreds) {
TezCompilerUtil.replaceOutput(actualPred, unionOpKey, splitPredKey.toString());
TezEdgeDescriptor edge = actualPred.outEdges.remove(unionOp.getOperatorKey());
if (edge == null) {
throw new VisitorException("Edge description is empty");
}
actualPred.outEdges.put(splitPredKey, edge);
splitPredOp.inEdges.put(actualPred.getOperatorKey(), edge);
if (predVertexGroup == null) {
// Disconnect FRJoin table/SkewedJoin sample edge to
// union op and connect to POSplit
tezPlan.disconnect(actualPred, unionOp);
tezPlan.connect(actualPred, splitPredOp);
}
}
}
}
}
/**
* Connect the split operator to the successors of the union operators and update the edges.
* Also change the inputs of the successor from the union operator to the split operator.
*
* @param unionOp Union operator
* @param splitPredOp Split operator which is the only member of the union
* @param successors Successors of the union operator
* @throws PlanException
* @throws VisitorException
*/
private void connectSplitOpToUnionSuccessors(TezOperator unionOp,
TezOperator splitPredOp, List<TezOperator> successors)
throws PlanException, VisitorException {
String unionOpKey = unionOp.getOperatorKey().toString();
String splitPredOpKey = splitPredOp.getOperatorKey().toString();
List<TezOperator> splitSuccessors = tezPlan.getSuccessors(splitPredOp);
if (successors != null) {
for (TezOperator succ : successors) {
TezOperator successorVertexGroup = null;
boolean removeSuccessorVertexGroup = false;
List<TezOperator> actualSuccs = new ArrayList<TezOperator>();
if (succ.isVertexGroup()) {
successorVertexGroup = succ;
if (tezPlan.getSuccessors(successorVertexGroup) != null) {
// There should not be multiple levels of vertex group. So no recursion required.
actualSuccs.addAll(tezPlan.getSuccessors(successorVertexGroup));
}
int index = succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey());
while (index > -1) {
succ.getVertexGroupMembers().set(index, splitPredOp.getOperatorKey());
index = succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey());
}
// Store vertex group
POStore store = successorVertexGroup.getVertexGroupInfo().getStore();
if (store != null) {
//Clone changes the operator keys
List<POStoreTez> storeOutputs = PlanHelper.getPhysicalOperators(splitPredOp.plan, POStoreTez.class);
for (POStoreTez storeOut : storeOutputs) {
if (storeOut.getOutputKey().equals(store.getOperatorKey().toString())) {
splitPredOp.addVertexGroupStore(storeOut.getOperatorKey(), successorVertexGroup.getOperatorKey());
}
}
}
tezPlan.disconnect(unionOp, successorVertexGroup);
Set<OperatorKey> uniqueVertexGroupMembers = new HashSet<OperatorKey>(succ.getVertexGroupMembers());
if (uniqueVertexGroupMembers.size() == 1) {
//Only splitPredOp is member of the vertex group. Get rid of the vertex group
removeSuccessorVertexGroup = true;
} else {
// Avoid connecting multiple times in case of union + self join
if (splitSuccessors == null || !splitSuccessors.contains(successorVertexGroup)) {
tezPlan.connect(splitPredOp, successorVertexGroup);
}
}
} else {
actualSuccs.add(succ);
}
// Store vertex group
if (actualSuccs.isEmpty() && removeSuccessorVertexGroup) {
splitPredOp.removeVertexGroupStore(successorVertexGroup.getOperatorKey());
tezPlan.remove(successorVertexGroup);
}
for (TezOperator actualSucc : actualSuccs) {
TezCompilerUtil.replaceInput(actualSucc, unionOpKey, splitPredOpKey);
TezEdgeDescriptor edge = actualSucc.inEdges.remove(unionOp.getOperatorKey());
if (edge == null) {
throw new VisitorException("Edge description is empty");
}
actualSucc.inEdges.put(splitPredOp.getOperatorKey(), edge);
splitPredOp.outEdges.put(actualSucc.getOperatorKey(), edge);
if (successorVertexGroup == null || removeSuccessorVertexGroup) {
if (removeSuccessorVertexGroup) {
// Changes plan from SplitOp -> Union -> VertexGroup - > Successor
// to SplitOp -> Successor
tezPlan.disconnect(successorVertexGroup, actualSucc);
tezPlan.remove(successorVertexGroup);
TezCompilerUtil.replaceInput(actualSucc, successorVertexGroup.getOperatorKey().toString(), splitPredOpKey);
} else {
// Changes plan from SplitOp -> Union -> Successor
// to SplitOp -> Successor
tezPlan.disconnect(unionOp, actualSucc);
}
// Avoid connecting multiple times in case of union + self join
if (splitSuccessors == null || !splitSuccessors.contains(actualSucc)) {
tezPlan.connect(splitPredOp, actualSucc);
}
}
}
}
}
}
/**
* Clone plan of union and merge it into the predecessor operator
*
* @param unionOp Union operator
* @param predOp Predecessor operator of union to which union plan should be merged to
*/
private PhysicalPlan cloneAndMergeUnionPlan(TezOperator unionOp, TezOperator predOp) throws VisitorException {
try {
PhysicalPlan predPlan = predOp.plan;
PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
// if predLeaf not POValueOutputTez
if (predLeaf instanceof POSplit) {
// Find the subPlan that connects to the union operator
predPlan = getUnionPredPlanFromSplit(predPlan, unionOp.getOperatorKey().toString());
predLeaf = predPlan.getLeaves().get(0);
}
PhysicalPlan clonePlan = unionOp.plan.clone();
// Remove POValueOutputTez from predecessor leaf
predPlan.remove(predLeaf);
boolean isEmptyPlan = predPlan.isEmpty();
if (!isEmptyPlan) {
predLeaf = predPlan.getLeaves().get(0);
}
predPlan.merge(clonePlan);
if (!isEmptyPlan) {
predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
}
return clonePlan;
} catch (Exception e) {
throw new VisitorException(e);
}
}
/**
* Connects the unionOp predecessor to the store vertex groups and the output vertex groups
* and disconnects it from the unionOp.
*
* @param pred Predecessor of union which will be made part of the vertex group
* @param unionOp Union operator
* @param predClonedUnionPlan Cloned plan of the union merged to the predecessor
* @param storeVertexGroupOps Store vertex groups to connect to
* @param outputVertexGroupOps Tez LogicalOutput vertex groups to connect to
*/
public void connectPredecessorsToVertexGroups(TezOperator unionOp,
TezOperator pred, PhysicalPlan predClonedUnionPlan,
TezOperator[] storeVertexGroupOps,
TezOperator[] outputVertexGroupOps) throws VisitorException,PlanException {
//Clone changes the operator keys
List<POStoreTez> clonedUnionStoreOutputs = PlanHelper.getPhysicalOperators(predClonedUnionPlan, POStoreTez.class);
// Connect predecessor to the storeVertexGroups
int i = 0;
for (TezOperator storeVertexGroup : storeVertexGroupOps) {
// Skip connecting if they are already connected. Can happen in case of
// union + split + union + store. Because of the split all the stores
// will be writing to same location
List<OperatorKey> inputs = storeVertexGroup.getVertexGroupInfo().getInputs();
if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
tezPlan.connect(pred, storeVertexGroup);
}
storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
storeVertexGroup.getOperatorKey());
}
for (TezOperator outputVertexGroup : outputVertexGroupOps) {
List<OperatorKey> inputs = outputVertexGroup.getVertexGroupInfo().getInputs();
if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
tezPlan.connect(pred, outputVertexGroup);
}
outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
}
copyOperatorProperties(pred, unionOp);
tezPlan.disconnect(pred, unionOp);
}
/**
* Connect vertexgroup operator to successor operator in the plan.
*
* Copy the output edge between union operator and successor to between
* predecessors and successor. Predecessor output key and output edge points
* to successor so that we have all the edge configuration, but they are
* connected to the vertex group in the plan.
*
* @param unionOp Union operator
* @param successors Successors of the union operator
* @param unionOutputKeys Output keys of union
* @param outputVertexGroupOp Tez LogicalOutput vertex groups corresponding to the output keys
*
* @throws PlanException
*/
private void connectVertexGroupsToSuccessors(TezOperator unionOp,
List<TezOperator> successors, List<String> unionOutputKeys,
TezOperator[] outputVertexGroupOps) throws PlanException {
// Connect to outputVertexGroupOps
for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) {
TezOperator succOp = tezPlan.getOperator(entry.getKey());
TezEdgeDescriptor edge = entry.getValue();
// Edge cannot be one to one as it will get input from two or
// more union predecessors. Change it to SCATTER_GATHER
if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
edge.dataMovementType = DataMovementType.SCATTER_GATHER;
edge.partitionerClass = HashValuePartitioner.class;
edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
}
TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
for (OperatorKey predKey : unionOp.getUnionMembers()) {
TezOperator pred = tezPlan.getOperator(predKey);
// Keep the output edge directly to successor
// Don't need to keep output edge for vertexgroup
pred.outEdges.put(entry.getKey(), edge);
succOp.inEdges.put(predKey, edge);
}
if(!tezPlan.pathExists(vertexGroupOp, succOp)) {
tezPlan.connect(vertexGroupOp, succOp);
}
}
}
private void replaceSuccessorInputsAndDisconnect(TezOperator unionOp,
List<TezOperator> successors,
List<String> unionOutputKeys,
String[] newOutputKeys)
throws VisitorException {
if (successors != null) {
String unionOpKey = unionOp.getOperatorKey().toString();
// Successor inputs should now point to the vertex groups.
for (TezOperator succ : successors) {
LinkedList<TezInput> inputs = PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
for (TezInput input : inputs) {
for (String key : input.getTezInputs()) {
if (key.equals(unionOpKey)) {
input.replaceInput(key,
newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]);
}
}
}
List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(succ.plan, POUserFunc.class);
for (POUserFunc userFunc : userFuncs) {
if (userFunc.getFunc() instanceof ReadScalarsTez) {
TezInput tezInput = (TezInput)userFunc.getFunc();
for (String inputKey : tezInput.getTezInputs()) {
if (inputKey.equals(unionOpKey)) {
tezInput.replaceInput(inputKey,
newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]);
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
}
}
}
}
tezPlan.disconnect(unionOp, succ);
}
}
}
private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) throws VisitorException {
pred.UDFs.addAll(unionOp.UDFs);
pred.scalars.addAll(unionOp.scalars);
// Copy only map side properties. For eg: crossKeys.
// Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc
// Also ignore parallelism settings
if (unionOp.getCrossKeys() != null) {
for (String key : unionOp.getCrossKeys()) {
pred.addCrossKey(key);
}
}
pred.copyFeatures(unionOp, Arrays.asList(new OPER_FEATURE[]{OPER_FEATURE.UNION}));
// For skewed join right input
if (unionOp.getSampleOperator() != null) {
if (pred.getSampleOperator() == null) {
pred.setSampleOperator(unionOp.getSampleOperator());
} else if (!pred.getSampleOperator().equals(unionOp.getSampleOperator())) {
throw new VisitorException("Conflicting sample operators "
+ pred.getSampleOperator().toString() + " and "
+ unionOp.getSampleOperator().toString());
}
}
}
public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, String unionOpKey) throws VisitorException {
List<POSplit> splits = PlanHelper.getPhysicalOperators(plan, POSplit.class);
for (POSplit split : splits) {
for (PhysicalPlan subPlan : split.getPlans()) {
if (subPlan.getLeaves().get(0) instanceof POValueOutputTez) {
POValueOutputTez out = (POValueOutputTez) subPlan.getLeaves().get(0);
if (out.containsOutputKey(unionOpKey)) {
return subPlan;
}
}
}
}
throw new VisitorException("Did not find the union predecessor in the split plan");
}
}