blob: 91b468d4d84bed4d3fc4a85965b390c3ebe8e6ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.api;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Stack;
import org.apache.commons.collections4.BidiMap;
import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezYARNUtils;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.VertexGroup.GroupInfo;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Top level entity that defines the DAG (Directed Acyclic Graph) representing
* the data flow graph. Consists of a set of Vertices and Edges connecting the
* vertices. Vertices represent transformations of data and edges represent
* movement of data between vertices.
*/
@Public
public class DAG {
private static final Log LOG = LogFactory.getLog(DAG.class);
final BidiMap<String, Vertex> vertices =
new DualLinkedHashBidiMap<String, Vertex>();
final Set<Edge> edges = Sets.newHashSet();
final String name;
final Collection<URI> urisForCredentials = new HashSet<URI>();
Credentials credentials = new Credentials();
Set<VertexGroup> vertexGroups = Sets.newHashSet();
Set<GroupInputEdge> groupInputEdges = Sets.newHashSet();
private DAGAccessControls dagAccessControls;
Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
String dagInfo;
private Stack<String> topologicalVertexStack = new Stack<String>();
private DAG(String name) {
this.name = name;
}
/**
* Create a DAG with the specified name.
* @param name the name of the DAG
* @return this {@link DAG}
*/
public static DAG create(String name) {
return new DAG(name);
}
/**
* Set the files etc that must be provided to the tasks of this DAG
* @param localFiles
* files that must be available locally for each task. These files
* may be regular files, archives etc. as specified by the value
* elements of the map.
* @return {@link DAG}
*/
public synchronized DAG addTaskLocalFiles(Map<String, LocalResource> localFiles) {
Preconditions.checkNotNull(localFiles);
TezCommonUtils.addAdditionalLocalResources(localFiles, commonTaskLocalFiles, "DAG " + getName());
return this;
}
public synchronized DAG addVertex(Vertex vertex) {
if (vertices.containsKey(vertex.getName())) {
throw new IllegalStateException(
"Vertex " + vertex.getName() + " already defined!");
}
vertices.put(vertex.getName(), vertex);
return this;
}
public synchronized Vertex getVertex(String vertexName) {
return vertices.get(vertexName);
}
/**
* One of the methods that can be used to provide information about required
* Credentials when running on a secure cluster. A combination of this and
* addURIsForCredentials should be used to specify information about all
* credentials required by a DAG. AM specific credentials are not used when
* executing a DAG.
*
* Set credentials which will be required to run this dag. This method can be
* used if the client has already obtained some or all of the required
* credentials.
*
* @param credentials Credentials for the DAG
* @return {@link DAG}
*/
public synchronized DAG setCredentials(Credentials credentials) {
this.credentials = credentials;
return this;
}
/**
* Set description info for this DAG that can be used for visualization purposes.
* @param dagInfo JSON blob as a serialized string.
* Recognized keys by the UI are:
* "context" - The application context in which this DAG is being used.
* For example, this could be set to "Hive" or "Pig" if
* this is being run as part of a Hive or Pig script.
* "description" - General description on what this DAG is going to do.
* In the case of Hive, this could be the SQL query text.
* @return {@link DAG}
*/
public synchronized DAG setDAGInfo(String dagInfo) {
Preconditions.checkNotNull(dagInfo);
this.dagInfo = dagInfo;
return this;
}
/**
* Create a group of vertices that share a common output. This can be used to implement
* unions efficiently.
* @param name Name of the group.
* @param members {@link Vertex} members of the group
* @return {@link DAG}
*/
public synchronized VertexGroup createVertexGroup(String name, Vertex... members) {
VertexGroup uv = new VertexGroup(name, members);
vertexGroups.add(uv);
return uv;
}
@Private
public synchronized Credentials getCredentials() {
return this.credentials;
}
/**
* Set Access controls for the DAG. Which user/groups can view the DAG progess/history and
* who can modify the DAG i.e. kill the DAG.
* The owner of the Tez Session and the user submitting the DAG are super-users and have access
* to all operations on the DAG.
* @param accessControls Access Controls
* @return {@link DAG}
*/
public synchronized DAG setAccessControls(DAGAccessControls accessControls) {
this.dagAccessControls = accessControls;
return this;
}
/**
* One of the methods that can be used to provide information about required
* Credentials when running on a secure cluster. A combination of this and
* setCredentials should be used to specify information about all credentials
* required by a DAG. AM specific credentials are not used when executing a
* DAG.
*
* This method can be used to specify a list of URIs for which Credentials
* need to be obtained so that the job can run. An incremental list of URIs
* can be provided by making multiple calls to the method.
*
* Currently, @{link credentials} can only be fetched for HDFS and other
* {@link org.apache.hadoop.fs.FileSystem} implementations that support
* credentials.
*
* @param uris
* a list of {@link URI}s
* @return {@link DAG}
*/
public synchronized DAG addURIsForCredentials(Collection<URI> uris) {
Preconditions.checkNotNull(uris, "URIs cannot be null");
urisForCredentials.addAll(uris);
return this;
}
/**
*
* @return an unmodifiable list representing the URIs for which credentials
* are required.
*/
@Private
public synchronized Collection<URI> getURIsForCredentials() {
return Collections.unmodifiableCollection(urisForCredentials);
}
@Private
public synchronized Set<Vertex> getVertices() {
return Collections.unmodifiableSet(this.vertices.values());
}
/**
* Add an {@link Edge} connecting vertices in the DAG
* @param edge The edge to be added
* @return {@link DAG}
*/
public synchronized DAG addEdge(Edge edge) {
// Sanity checks
if (!vertices.containsValue(edge.getInputVertex())) {
throw new IllegalArgumentException(
"Input vertex " + edge.getInputVertex() + " doesn't exist!");
}
if (!vertices.containsValue(edge.getOutputVertex())) {
throw new IllegalArgumentException(
"Output vertex " + edge.getOutputVertex() + " doesn't exist!");
}
if (edges.contains(edge)) {
throw new IllegalArgumentException(
"Edge " + edge + " already defined!");
}
// inform the vertices
edge.getInputVertex().addOutputVertex(edge.getOutputVertex(), edge);
edge.getOutputVertex().addInputVertex(edge.getInputVertex(), edge);
edges.add(edge);
return this;
}
/**
* Add a {@link GroupInputEdge} to the DAG.
* @param edge {@link GroupInputEdge}
* @return {@link DAG}
*/
public synchronized DAG addEdge(GroupInputEdge edge) {
// Sanity checks
if (!vertexGroups.contains(edge.getInputVertexGroup())) {
throw new IllegalArgumentException(
"Input vertex " + edge.getInputVertexGroup() + " doesn't exist!");
}
if (!vertices.containsValue(edge.getOutputVertex())) {
throw new IllegalArgumentException(
"Output vertex " + edge.getOutputVertex() + " doesn't exist!");
}
if (groupInputEdges.contains(edge)) {
throw new IllegalArgumentException(
"Edge " + edge + " already defined!");
}
VertexGroup av = edge.getInputVertexGroup();
av.addOutputVertex(edge.getOutputVertex(), edge);
groupInputEdges.add(edge);
// add new edge between members of VertexGroup and destVertex of the GroupInputEdge
List<Edge> newEdges = Lists.newLinkedList();
Vertex dstVertex = edge.getOutputVertex();
VertexGroup uv = edge.getInputVertexGroup();
for (Vertex member : uv.getMembers()) {
newEdges.add(Edge.create(member, dstVertex, edge.getEdgeProperty()));
}
dstVertex.addGroupInput(uv.getGroupName(), uv.getGroupInfo());
for (Edge e : newEdges) {
addEdge(e);
}
return this;
}
/**
* Get the DAG name
* @return DAG name
*/
public String getName() {
return this.name;
}
@Private
public Map<String, LocalResource> getTaskLocalFiles() {
return commonTaskLocalFiles;
}
void checkAndInferOneToOneParallelism() {
// infer all 1-1 via dependencies
// collect all 1-1 edges where the source parallelism is set
Set<Vertex> newKnownTasksVertices = Sets.newHashSet();
for (Vertex vertex : vertices.values()) {
if (vertex.getParallelism() > -1) {
newKnownTasksVertices.add(vertex);
}
}
// walk through all known source 1-1 edges and infer parallelism
// add newly inferred vertices for consideration as known sources
// the outer loop will run for every new level of inferring the parallelism
// however, the entire logic will process each vertex only once
while(!newKnownTasksVertices.isEmpty()) {
Set<Vertex> knownTasksVertices = Sets.newHashSet(newKnownTasksVertices);
newKnownTasksVertices.clear();
for (Vertex v : knownTasksVertices) {
for (Edge e : v.getOutputEdges()) {
if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
Vertex outVertex = e.getOutputVertex();
if (outVertex.getParallelism() == -1) {
LOG.info("Inferring parallelism for vertex: "
+ outVertex.getName() + " to be " + v.getParallelism()
+ " from 1-1 connection with vertex " + v.getName());
outVertex.setParallelism(v.getParallelism());
newKnownTasksVertices.add(outVertex);
}
}
}
}
}
// check for inconsistency and errors
for (Edge e : edges) {
Vertex inputVertex = e.getInputVertex();
Vertex outputVertex = e.getOutputVertex();
if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
if (inputVertex.getParallelism() != outputVertex.getParallelism()) {
// both should be equal or equal to -1.
if (outputVertex.getParallelism() != -1) {
throw new TezUncheckedException(
"1-1 Edge. Destination vertex parallelism must match source vertex. "
+ "Vertex: " + inputVertex.getName() + " does not match vertex: "
+ outputVertex.getName());
}
}
}
}
}
// AnnotatedVertex is used by verify()
private static class AnnotatedVertex {
Vertex v;
int index; //for Tarjan's algorithm
int lowlink; //for Tarjan's algorithm
boolean onstack; //for Tarjan's algorithm
private AnnotatedVertex(Vertex v) {
this.v = v;
index = -1;
lowlink = -1;
}
}
// verify()
//
// Default rules
// Illegal:
// - duplicate vertex id
// - cycles
//
// Ok:
// - orphaned vertex. Occurs in map-only
// - islands. Occurs if job has unrelated workflows.
//
// Not yet categorized:
// - orphaned vertex in DAG of >1 vertex. Could be unrelated map-only job.
// - v1->v2 via two edges. perhaps some self-join job would use this?
//
// "restricted" mode:
// In short term, the supported DAGs are limited. Call with restricted=true for these verifications.
// Illegal:
// - any vertex with more than one input or output edge. (n-ary input, n-ary merge)
@VisibleForTesting
void verify() throws IllegalStateException {
verify(true);
}
@VisibleForTesting
void verify(boolean restricted) throws IllegalStateException {
if (vertices.isEmpty()) {
throw new IllegalStateException("Invalid dag containing 0 vertices");
}
// check for valid vertices, duplicate vertex names,
// and prepare for cycle detection
Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
Map<Vertex, Set<String>> inboundVertexMap = new HashMap<Vertex, Set<String>>();
Map<Vertex, Set<String>> outboundVertexMap = new HashMap<Vertex, Set<String>>();
for (Vertex v : vertices.values()) {
if (vertexMap.containsKey(v.getName())) {
throw new IllegalStateException("DAG contains multiple vertices"
+ " with name: " + v.getName());
}
vertexMap.put(v.getName(), new AnnotatedVertex(v));
}
Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
for (Edge e : edges) {
// Construct structure for cycle detection
Vertex inputVertex = e.getInputVertex();
Vertex outputVertex = e.getOutputVertex();
List<Edge> edgeList = edgeMap.get(inputVertex);
if (edgeList == null) {
edgeList = new ArrayList<Edge>();
edgeMap.put(inputVertex, edgeList);
}
edgeList.add(e);
// Construct map for Input name verification
Set<String> inboundSet = inboundVertexMap.get(outputVertex);
if (inboundSet == null) {
inboundSet = new HashSet<String>();
inboundVertexMap.put(outputVertex, inboundSet);
}
inboundSet.add(inputVertex.getName());
// Construct map for Output name verification
Set<String> outboundSet = outboundVertexMap.get(inputVertex);
if (outboundSet == null) {
outboundSet = new HashSet<String>();
outboundVertexMap.put(inputVertex, outboundSet);
}
outboundSet.add(outputVertex.getName());
}
// check input and output names don't collide with vertex names
for (Vertex vertex : vertices.values()) {
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
input : vertex.getInputs()) {
if (vertexMap.containsKey(input.getName())) {
throw new IllegalStateException("Vertex: "
+ vertex.getName()
+ " contains an Input with the same name as vertex: "
+ input.getName());
}
}
for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
output : vertex.getOutputs()) {
if (vertexMap.containsKey(output.getName())) {
throw new IllegalStateException("Vertex: "
+ vertex.getName()
+ " contains an Output with the same name as vertex: "
+ output.getName());
}
}
}
// Check for valid InputNames
for (Entry<Vertex, Set<String>> entry : inboundVertexMap.entrySet()) {
Vertex vertex = entry.getKey();
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
input : vertex.getInputs()) {
if (entry.getValue().contains(input.getName())) {
throw new IllegalStateException("Vertex: "
+ vertex.getName()
+ " contains an incoming vertex and Input with the same name: "
+ input.getName());
}
}
}
// Check for valid OutputNames
for (Entry<Vertex, Set<String>> entry : outboundVertexMap.entrySet()) {
Vertex vertex = entry.getKey();
for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
output : vertex.getOutputs()) {
if (entry.getValue().contains(output.getName())) {
throw new IllegalStateException("Vertex: "
+ vertex.getName()
+ " contains an outgoing vertex and Output with the same name: "
+ output.getName());
}
}
}
// Not checking for repeated input names / output names vertex names on the same vertex,
// since we only allow 1 at the moment.
// When additional inputs are supported, this can be chceked easily (and early)
// within the addInput / addOutput call itself.
detectCycles(edgeMap, vertexMap);
checkAndInferOneToOneParallelism();
if (restricted) {
for (Edge e : edges) {
if (e.getEdgeProperty().getDataSourceType() !=
DataSourceType.PERSISTED) {
throw new IllegalStateException(
"Unsupported source type on edge. " + e);
}
if (e.getEdgeProperty().getSchedulingType() !=
SchedulingType.SEQUENTIAL) {
throw new IllegalStateException(
"Unsupported scheduling type on edge. " + e);
}
}
}
}
// Adaptation of Tarjan's algorithm for connected components.
// http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap)
throws IllegalStateException {
Integer nextIndex = 0; // boxed integer so it is passed by reference.
Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
for (AnnotatedVertex av : vertexMap.values()) {
if (av.index == -1) {
assert stack.empty();
strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
}
}
}
// part of Tarjan's algorithm for connected components.
// http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
private void strongConnect(
AnnotatedVertex av,
Map<String, AnnotatedVertex> vertexMap,
Map<Vertex, List<Edge>> edgeMap,
Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException {
av.index = nextIndex;
av.lowlink = nextIndex;
nextIndex++;
stack.push(av);
av.onstack = true;
List<Edge> edges = edgeMap.get(av.v);
if (edges != null) {
for (Edge e : edgeMap.get(av.v)) {
AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getName());
if (outVertex.index == -1) {
strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
} else if (outVertex.onstack) {
// strongly connected component detected, but we will wait till later so that the full cycle can be displayed.
// update lowlink in case outputVertex should be considered the root of this component.
av.lowlink = Math.min(av.lowlink, outVertex.index);
}
}
}
if (av.lowlink == av.index) {
AnnotatedVertex pop = stack.pop();
pop.onstack = false;
if (pop != av) {
// there was something on the stack other than this "av".
// this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av"
StringBuilder message = new StringBuilder();
message.append(av.v.getName()).append(" <- ");
for (; pop != av; pop = stack.pop()) {
message.append(pop.v.getName()).append(" <- ");
pop.onstack = false;
}
message.append(av.v.getName());
throw new IllegalStateException("DAG contains a cycle: " + message);
} else {
// detect self-cycle
if (edgeMap.containsKey(pop.v)) {
for (Edge edge : edgeMap.get(pop.v)) {
if (edge.getOutputVertex().equals(pop.v)) {
throw new IllegalStateException("DAG contains a self-cycle on vertex:" + pop.v.getName());
}
}
}
}
topologicalVertexStack.push(av.v.getName());
}
}
// create protobuf message describing DAG
@Private
public DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
boolean tezLrsAsArchive) {
verify(true);
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
dagBuilder.setName(this.name);
if (this.dagInfo != null && !this.dagInfo.isEmpty()) {
dagBuilder.setDagInfo(this.dagInfo);
}
if (!vertexGroups.isEmpty()) {
for (VertexGroup av : vertexGroups) {
GroupInfo groupInfo = av.getGroupInfo();
PlanVertexGroupInfo.Builder groupBuilder = PlanVertexGroupInfo.newBuilder();
groupBuilder.setGroupName(groupInfo.getGroupName());
for (Vertex v : groupInfo.getMembers()) {
groupBuilder.addGroupMembers(v.getName());
}
groupBuilder.addAllOutputs(groupInfo.outputs);
for (Map.Entry<String, InputDescriptor> entry :
groupInfo.edgeMergedInputs.entrySet()) {
groupBuilder.addEdgeMergedInputs(
PlanGroupInputEdgeInfo.newBuilder().setDestVertexName(entry.getKey()).
setMergedInput(DagTypeConverters.convertToDAGPlan(entry.getValue())));
}
dagBuilder.addVertexGroups(groupBuilder);
}
}
Credentials dagCredentials = new Credentials();
if (extraCredentials != null) {
dagCredentials.mergeAll(extraCredentials);
}
dagCredentials.mergeAll(credentials);
if (!commonTaskLocalFiles.isEmpty()) {
dagBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(commonTaskLocalFiles));
}
Preconditions.checkArgument(topologicalVertexStack.size() == vertices.size(),
"size of topologicalVertexStack is:" + topologicalVertexStack.size() +
" while size of vertices is:" + vertices.size() +
", make sure they are the same in order to sort the vertices");
while(!topologicalVertexStack.isEmpty()) {
Vertex vertex = vertices.get(topologicalVertexStack.pop());
// infer credentials, resources and parallelism from data source
Resource vertexTaskResource = vertex.getTaskResource();
if (vertexTaskResource == null) {
vertexTaskResource = Resource.newInstance(dagConf.getInt(
TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), dagConf.getInt(
TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
}
Map<String, LocalResource> vertexLRs = Maps.newHashMap();
vertexLRs.putAll(vertex.getTaskLocalFiles());
List<DataSourceDescriptor> dataSources = vertex.getDataSources();
for (DataSourceDescriptor dataSource : dataSources) {
if (dataSource.getCredentials() != null) {
dagCredentials.addAll(dataSource.getCredentials());
}
if (dataSource.getAdditionalLocalFiles() != null) {
TezCommonUtils
.addAdditionalLocalResources(dataSource.getAdditionalLocalFiles(), vertexLRs,
"Vertex " + vertex.getName());
}
}
if (tezJarResources != null) {
TezCommonUtils
.addAdditionalLocalResources(tezJarResources, vertexLRs, "Vertex " + vertex.getName());
}
if (binaryConfig != null) {
vertexLRs.put(TezConstants.TEZ_PB_BINARY_CONF_NAME, binaryConfig);
}
int vertexParallelism = vertex.getParallelism();
VertexLocationHint vertexLocationHint = vertex.getLocationHint();
if (dataSources.size() == 1) {
DataSourceDescriptor dataSource = dataSources.get(0);
if (vertexParallelism == -1 && dataSource.getNumberOfShards() > -1) {
vertexParallelism = dataSource.getNumberOfShards();
}
if (vertexLocationHint == null && dataSource.getLocationHint() != null) {
vertexLocationHint = dataSource.getLocationHint();
}
}
if (vertexParallelism == -1) {
Preconditions.checkState(vertexLocationHint == null,
"Cannot specify vertex location hint without specifying vertex parallelism. Vertex: "
+ vertex.getName());
} else if (vertexLocationHint != null) {
Preconditions.checkState(vertexParallelism == vertexLocationHint.getTaskLocationHints().size(),
"vertex task location hint must equal vertex parallelism. Vertex: " + vertex.getName());
}
for (DataSinkDescriptor dataSink : vertex.getDataSinks()) {
if (dataSink.getCredentials() != null) {
dagCredentials.addAll(dataSink.getCredentials());
}
}
VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
vertexBuilder.setName(vertex.getName());
vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46.
vertexBuilder.setProcessorDescriptor(DagTypeConverters
.convertToDAGPlan(vertex.getProcessorDescriptor()));
if (vertex.getInputs().size() > 0) {
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : vertex.getInputs()) {
vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));
}
}
if (vertex.getOutputs().size() > 0) {
for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output : vertex.getOutputs()) {
vertexBuilder.addOutputs(DagTypeConverters.convertToDAGPlan(output));
}
}
//task config
PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
taskConfigBuilder.setNumTasks(vertexParallelism);
taskConfigBuilder.setMemoryMb(vertexTaskResource.getMemory());
taskConfigBuilder.setVirtualCores(vertexTaskResource.getVirtualCores());
taskConfigBuilder.setJavaOpts(
TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vertex.getTaskLaunchCmdOpts(), dagConf));
taskConfigBuilder.setTaskModule(vertex.getName());
if (!vertexLRs.isEmpty()) {
taskConfigBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(vertexLRs));
}
Map<String, String> taskEnv = Maps.newHashMap(vertex.getTaskEnvironment());
TezYARNUtils.setupDefaultEnv(taskEnv, dagConf,
TezConfiguration.TEZ_TASK_LAUNCH_ENV,
TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive);
for (Map.Entry<String, String> entry : taskEnv.entrySet()) {
PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
envSettingBuilder.setKey(entry.getKey());
envSettingBuilder.setValue(entry.getValue());
taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
}
if (vertexLocationHint != null) {
if (vertexLocationHint.getTaskLocationHints() != null) {
for (TaskLocationHint hint : vertexLocationHint.getTaskLocationHints()) {
PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
// we can allow this later on if needed
if (hint.getAffinitizedTask() != null) {
throw new TezUncheckedException(
"Task based affinity may not be specified via the DAG API");
}
if (hint.getHosts() != null) {
taskLocationHintBuilder.addAllHost(hint.getHosts());
}
if (hint.getRacks() != null) {
taskLocationHintBuilder.addAllRack(hint.getRacks());
}
vertexBuilder.addTaskLocationHint(taskLocationHintBuilder);
}
}
}
if (vertex.getVertexManagerPlugin() != null) {
vertexBuilder.setVertexManagerPlugin(DagTypeConverters
.convertToDAGPlan(vertex.getVertexManagerPlugin()));
}
for (Edge inEdge : vertex.getInputEdges()) {
vertexBuilder.addInEdgeId(inEdge.getId());
}
for (Edge outEdge : vertex.getOutputEdges()) {
vertexBuilder.addOutEdgeId(outEdge.getId());
}
vertexBuilder.setTaskConfig(taskConfigBuilder);
dagBuilder.addVertex(vertexBuilder);
}
for (Edge edge : edges) {
EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
edgeBuilder.setId(edge.getId());
edgeBuilder.setInputVertexName(edge.getInputVertex().getName());
edgeBuilder.setOutputVertexName(edge.getOutputVertex().getName());
edgeBuilder.setDataMovementType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataMovementType()));
edgeBuilder.setDataSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataSourceType()));
edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType()));
edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource()));
edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination()));
if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM) {
if (edge.getEdgeProperty().getEdgeManagerDescriptor() != null) {
edgeBuilder.setEdgeManager(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeManagerDescriptor()));
} // else the AM will deal with this.
}
dagBuilder.addEdge(edgeBuilder);
}
ConfigurationProto.Builder confProtoBuilder =
ConfigurationProto.newBuilder();
if (dagConf != null) {
Iterator<Entry<String, String>> iter = dagConf.iterator();
while (iter.hasNext()) {
Entry<String, String> entry = iter.next();
PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
kvp.setKey(entry.getKey());
kvp.setValue(entry.getValue());
confProtoBuilder.addConfKeyValues(kvp);
}
}
if (dagAccessControls != null) {
Configuration aclConf = new Configuration(false);
dagAccessControls.serializeToConfiguration(aclConf);
Iterator<Entry<String, String>> aclConfIter = aclConf.iterator();
while (aclConfIter.hasNext()) {
Entry<String, String> entry = aclConfIter.next();
PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
kvp.setKey(entry.getKey());
kvp.setValue(entry.getValue());
confProtoBuilder.addConfKeyValues(kvp);
}
}
dagBuilder.setDagKeyValues(confProtoBuilder); // This does not seem to be used anywhere
// should this replace BINARY_PB_CONF???
if (dagCredentials != null) {
dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials));
TezCommonUtils.logCredentials(LOG, dagCredentials, "dag");
}
return dagBuilder.build();
}
}