blob: 99d88579f7000508f034dc10d8b89d16884b89cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.tez.util;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.builtin.TOBAG;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
import org.apache.tez.runtime.library.output.UnorderedKVOutput;
import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
import com.google.common.collect.Lists;
public class TezCompilerUtil {
public static String TUPLE_CLASS = TupleFactory.getInstance().tupleClass().getName();
private TezCompilerUtil() {
}
// simpleConnectTwoVertex is a utility to end a vertex equivalent to map and start vertex equivalent to
// reduce in a tez operator:
// 1. op1 is open
// 2. op2 is blank
// POPackage to start a reduce vertex
// 3. POLocalRearrange/POPackage are trivial
// 4. User need to connect op1 to op2 themselves
static public void simpleConnectTwoVertex(TezOperPlan tezPlan, TezOperator op1, TezOperator op2, String scope, NodeIdGenerator nig) throws PlanException
{
PhysicalPlan ep = new PhysicalPlan();
POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prjStar.setResultType(DataType.TUPLE);
prjStar.setStar(true);
ep.add(prjStar);
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
eps.add(ep);
POLocalRearrangeTez lr = new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
try {
lr.setIndex(0);
} catch (ExecException e) {
int errCode = 2058;
String msg = "Unable to set index on the newly created POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
lr.setKeyType(DataType.TUPLE);
lr.setPlans(eps);
lr.setResultType(DataType.TUPLE);
lr.setOutputKey(op2.getOperatorKey().toString());
op1.plan.addAsLeaf(lr);
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
pkg.getPkgr().setKeyType(DataType.TUPLE);
pkg.setNumInps(1);
boolean[] inner = {false};
pkg.getPkgr().setInner(inner);
op2.plan.add(pkg);
op2.plan.addAsLeaf(getForEachPlain(scope, nig));
connect(tezPlan, op1, op2);
}
static public TezEdgeDescriptor connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
plan.connect(from, to);
if (!from.plan.isEmpty()) {
PhysicalOperator leaf = from.plan.getLeaves().get(0);
// It could be POStoreTez incase of sampling job in order by
if (leaf instanceof POLocalRearrangeTez) {
POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
lr.setOutputKey(to.getOperatorKey().toString());
}
}
// Add edge descriptors to old and new operators
TezEdgeDescriptor edge = new TezEdgeDescriptor();
to.inEdges.put(from.getOperatorKey(), edge);
from.outEdges.put(to.getOperatorKey(), edge);
return edge;
}
static public void connect(TezOperPlan plan, TezOperator from, TezOperator to, TezEdgeDescriptor edge) throws PlanException {
plan.connect(from, to);
// Add edge descriptors to old and new operators
to.inEdges.put(from.getOperatorKey(), edge);
from.outEdges.put(to.getOperatorKey(), edge);
}
static public void connectTezOpToNewPredecessor(TezOperPlan plan,
TezOperator tezOp, TezOperator newPredecessor,
TezEdgeDescriptor edge, String oldInputKey) throws PlanException {
plan.connect(newPredecessor, tezOp);
// Add edge descriptors to old and new operators
tezOp.inEdges.put(newPredecessor.getOperatorKey(), edge);
newPredecessor.outEdges.put(tezOp.getOperatorKey(), edge);
if (oldInputKey != null) {
replaceInput(tezOp, oldInputKey, newPredecessor.getOperatorKey().toString());
}
}
public static void replaceInput(TezOperator tezOp, String oldInputKey,
String newInputKey) throws PlanException {
try {
List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
for (TezInput input : inputs) {
input.replaceInput(oldInputKey, newInputKey);
}
List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
for (POUserFunc userFunc : userFuncs) {
if (userFunc.getFunc() instanceof ReadScalarsTez) {
TezInput input = (TezInput)userFunc.getFunc();
input.replaceInput(oldInputKey, newInputKey);
userFunc.getFuncSpec().setCtorArgs(input.getTezInputs());
}
}
} catch (VisitorException e) {
throw new PlanException(e);
}
}
static public void connectTezOpToNewSuccesor(TezOperPlan plan,
TezOperator tezOp, TezOperator newSuccessor,
TezEdgeDescriptor edge, String oldOutputKey) throws PlanException {
plan.connect(tezOp, newSuccessor);
// Add edge descriptors to old and new operators
newSuccessor.inEdges.put(tezOp.getOperatorKey(), edge);
tezOp.outEdges.put(newSuccessor.getOperatorKey(), edge);
if (oldOutputKey != null) {
replaceOutput(tezOp, oldOutputKey, newSuccessor.getOperatorKey().toString());
}
}
public static void replaceOutput(TezOperator tezOp, String oldOutputKey,
String newOutputKey) throws PlanException {
try {
List<TezOutput> tezOutputs = PlanHelper.getPhysicalOperators(tezOp.plan,
TezOutput.class);
for (TezOutput tezOut : tezOutputs) {
if (ArrayUtils.contains(tezOut.getTezOutputs(), oldOutputKey)) {
tezOut.replaceOutput(oldOutputKey, newOutputKey);
}
}
} catch (VisitorException e) {
throw new PlanException(e);
}
}
public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException {
try {
List<POFRJoinTez> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, POFRJoinTez.class);
for (POFRJoinTez input : inputs) {
if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
return true;
}
}
List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
for (POUserFunc userFunc : userFuncs) {
if (userFunc.getFunc() instanceof ReadScalarsTez) {
TezInput input = (TezInput)userFunc.getFunc();
if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
return true;
}
}
}
return false;
} catch (VisitorException e) {
throw new PlanException(e);
}
}
static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) {
PhysicalPlan forEachPlan = new PhysicalPlan();
forEachPlan.add(project);
List<PhysicalPlan> forEachPlans = Lists.newArrayList();
forEachPlans.add(forEachPlan);
List<Boolean> flatten = Lists.newArrayList();
flatten.add(true);
POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), rp, forEachPlans, flatten);
forEach.setResultType(DataType.BAG);
return forEach;
}
// Get a plain POForEach: ForEach X generate flatten($1)
static public POForEach getForEachPlain(String scope, NodeIdGenerator nig) {
POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
project.setResultType(DataType.TUPLE);
project.setStar(false);
project.setColumn(1);
project.setOverloaded(true);
return getForEach(project, -1, scope, nig);
}
static public POStore getStore(String scope, NodeIdGenerator nig) {
POStore st = new POStoreTez(new OperatorKey(scope, nig.getNextNodeId(scope)));
// mark store as tmp store. These could be removed by the
// optimizer, because it wasn't the user requesting it.
st.setIsTmpStore(true);
return st;
}
static public void setCustomPartitioner(String customPartitioner, TezOperator tezOp) throws IOException {
if (customPartitioner != null) {
for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
edge.partitionerClass = PigContext.resolveClassName(customPartitioner);
}
}
}
// Used with POValueOutputTez
static public void configureValueOnlyTupleOutput(TezEdgeDescriptor edge, DataMovementType dataMovementType) {
edge.dataMovementType = dataMovementType;
if (dataMovementType == DataMovementType.BROADCAST || dataMovementType == DataMovementType.ONE_TO_ONE) {
edge.outputClassName = UnorderedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
} else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
edge.partitionerClass = HashValuePartitioner.class;
}
edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
}
public static boolean bagDataTypeInCombinePlan(PhysicalPlan combinePlan) throws ExecException {
PhysicalOperator lr = combinePlan.getLeaves().get(0);
POForEach fe = (POForEach) combinePlan.getPredecessors(lr).get(0);
// Hack. class.getTypeName() is only available in JDK8
Type dataBagType = new TOBAG().getReturnType();
List<PhysicalPlan> inputPlans = fe.getInputPlans();
for (PhysicalPlan inputPlan: inputPlans) {
PhysicalOperator leaf = inputPlan.getLeaves().get(0);
if (leaf.getResultType() == DataType.BAG) {
return true;
} else if (leaf instanceof POUserFunc) {
POUserFunc func = (POUserFunc) leaf;
// Return type of Intermediate func in combiner plan is always Tuple.
// Need to check original or Final EvalFunc return type
if (dataBagType.equals(func.getOriginalReturnType())) {
return true;
}
}
}
return false;
}
public static void addAllPredecessors(TezOperPlan tezPlan, TezOperator tezOp, Set<TezOperator> predSet) {
if (tezPlan.getPredecessors(tezOp) != null) {
for (TezOperator pred : tezPlan.getPredecessors(tezOp)) {
if( ! predSet.contains(pred) ) {
predSet.add(pred);
addAllPredecessors(tezPlan, pred, predSet);
}
}
}
}
}