blob: 8ce3fb67e694c6567c03952401406e5dd059b8dd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.api;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.VertexGroup.GroupInfo;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* Defines a vertex in the DAG. It represents the application logic that
* processes and transforms the input data to create the output data. The
* vertex represents the template from which tasks are created to execute
* the application in parallel across a distributed execution environment.
*/
@Public
public class Vertex {
private final String vertexName;
private final ProcessorDescriptor processorDescriptor;
private int parallelism;
private VertexLocationHint locationHint;
private Resource taskResource;
private final Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
private Map<String, String> taskEnvironment = new HashMap<String, String>();
private Map<String, String> vertexConf = new HashMap<String, String>();
private VertexExecutionContext vertexExecutionContext;
private final Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs
= new HashMap<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
private final Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs
= new HashMap<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>();
private VertexManagerPluginDescriptor vertexManagerPlugin;
private final List<Vertex> inputVertices = new ArrayList<Vertex>();
private final List<Vertex> outputVertices = new ArrayList<Vertex>();
private final List<Edge> inputEdges = new ArrayList<Edge>();
private final List<Edge> outputEdges = new ArrayList<Edge>();
private final Map<String, GroupInfo> groupInputs = Maps.newHashMap();
private final List<DataSourceDescriptor> dataSources = Lists.newLinkedList();
private final List<DataSinkDescriptor> dataSinks = Lists.newLinkedList();
private String taskLaunchCmdOpts = "";
@InterfaceAudience.Private
Vertex(String vertexName,
ProcessorDescriptor processorDescriptor,
int parallelism,
Resource taskResource) {
this(vertexName, processorDescriptor, parallelism, taskResource, false);
}
private Vertex(String vertexName, ProcessorDescriptor processorDescriptor, int parallelism) {
this(vertexName, processorDescriptor, parallelism, null, true);
}
private Vertex(String vertexName, ProcessorDescriptor processorDescriptor) {
this(vertexName, processorDescriptor, -1);
}
private Vertex(String vertexName,
ProcessorDescriptor processorDescriptor,
int parallelism,
Resource taskResource,
boolean allowIncomplete) {
this.vertexName = vertexName;
this.processorDescriptor = processorDescriptor;
this.parallelism = parallelism;
this.taskResource = taskResource;
if (parallelism < -1) {
throw new IllegalArgumentException(
"Parallelism should be -1 if determined by the AM"
+ ", otherwise should be >= 0");
}
if (!allowIncomplete && taskResource == null) {
throw new IllegalArgumentException("Resource cannot be null");
}
}
/**
* Create a new vertex with the given name.
*
* @param vertexName
* Name of the vertex
* @param processorDescriptor
* Description of the processor that is executed in every task of
* this vertex
* @param parallelism
* Number of tasks in this vertex. Set to -1 if this is going to be
* decided at runtime. Parallelism may change at runtime due to graph
* reconfigurations.
* @param taskResource
* Physical resources like memory/cpu thats used by each task of this
* vertex.
* @return a new Vertex with the given parameters
*/
public static Vertex create(String vertexName,
ProcessorDescriptor processorDescriptor,
int parallelism,
Resource taskResource) {
return new Vertex(vertexName, processorDescriptor, parallelism, taskResource);
}
/**
* Create a new vertex with the given name. <br>
* The vertex task resource will be picked from configuration <br>
* The vertex parallelism will be inferred. If it cannot be inferred then an
* error will be reported. This constructor may be used for vertices that have
* data sources, or connected via 1-1 edges or have runtime parallelism
* estimation via data source initializers or vertex managers. Calling this
* constructor is equivalent to calling
* {@link Vertex#Vertex(String, ProcessorDescriptor, int)} with the
* parallelism set to -1.
*
* @param vertexName
* Name of the vertex
* @param processorDescriptor
* Description of the processor that is executed in every task of
* this vertex
* @return a new Vertex with the given parameters
*/
public static Vertex create(String vertexName, ProcessorDescriptor processorDescriptor) {
return new Vertex(vertexName, processorDescriptor);
}
/**
* Create a new vertex with the given name and parallelism. <br>
* The vertex task resource will be picked from configuration
* {@link TezConfiguration#TEZ_TASK_RESOURCE_MEMORY_MB} &
* {@link TezConfiguration#TEZ_TASK_RESOURCE_CPU_VCORES} Applications that
* want more control over their task resource specification may create their
* own logic to determine task resources and use
* {@link Vertex#Vertex(String, ProcessorDescriptor, int, Resource)} to create
* the Vertex.
*
* @param vertexName
* Name of the vertex
* @param processorDescriptor
* Description of the processor that is executed in every task of
* this vertex
* @param parallelism
* Number of tasks in this vertex. Set to -1 if this is going to be
* decided at runtime. Parallelism may change at runtime due to graph
* reconfigurations.
* @return a new Vertex with the given parameters
*/
public static Vertex create(String vertexName, ProcessorDescriptor processorDescriptor,
int parallelism) {
return new Vertex(vertexName, processorDescriptor, parallelism);
}
/**
* Get the vertex name
* @return vertex name
*/
public String getName() {
return vertexName;
}
/**
* Get the vertex task processor descriptor
* @return process descriptor
*/
public ProcessorDescriptor getProcessorDescriptor() {
return this.processorDescriptor;
}
/**
* Get the specified number of tasks specified to run in this vertex. It may
* be -1 if the parallelism is defined at runtime. Parallelism may change at
* runtime
* @return vertex parallelism
*/
public int getParallelism() {
return parallelism;
}
/**
* Set the number of tasks for this vertex
* @param parallelism Parallelism for this vertex
*/
void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
/**
* Get the resources for the vertex
* @return the physical resources like pcu/memory of each vertex task
*/
public Resource getTaskResource() {
return taskResource;
}
/**
* Specify location hints for the tasks of this vertex. Hints must be specified
* for all tasks as defined by the parallelism
* @param locationHint list of locations for each task in the vertex
* @return this Vertex
*/
public Vertex setLocationHint(VertexLocationHint locationHint) {
List<TaskLocationHint> locations = locationHint.getTaskLocationHints();
if (locations == null) {
return this;
}
Preconditions.checkArgument((locations.size() == parallelism),
"Locations array length must match the parallelism set for the vertex");
this.locationHint = locationHint;
return this;
}
// used internally to create parallelism location resource file
VertexLocationHint getLocationHint() {
return locationHint;
}
/**
* Set the files etc that must be provided to the tasks of this vertex
* @param localFiles
* files that must be available locally for each task. These files
* may be regular files, archives etc. as specified by the value
* elements of the map.
* @return this Vertex
*/
public Vertex addTaskLocalFiles(Map<String, LocalResource> localFiles) {
if (localFiles != null) {
TezCommonUtils.addAdditionalLocalResources(localFiles, taskLocalResources, "Vertex " + getName());
}
return this;
}
/**
* Get the files etc that must be provided by the tasks of this vertex
* @return local files of the vertex. Key is the file name.
*/
public Map<String, LocalResource> getTaskLocalFiles() {
return taskLocalResources;
}
/**
* Set the Key-Value pairs of environment variables for tasks of this vertex.
* This method should be used if different vertices need different env. Else,
* set environment for all vertices via Tezconfiguration#TEZ_TASK_LAUNCH_ENV
* @param environment
* @return this Vertex
* NullPointerException if {@code environment} is {@code null}
*/
public Vertex setTaskEnvironment(Map<String, String> environment) {
this.taskEnvironment.putAll(Objects.requireNonNull(environment));
return this;
}
/**
* Get the environment variables of the tasks
* @return environment variable map
*/
public Map<String, String> getTaskEnvironment() {
return taskEnvironment;
}
public Map<String, String> getConf() {
return vertexConf;
}
/**
* Set the command opts for tasks of this vertex. This method should be used
* when different vertices have different opts. Else, set the launch opts for '
* all vertices via Tezconfiguration#TEZ_TASK_LAUNCH_CMD_OPTS
* @param cmdOpts
* @return this Vertex
*/
public Vertex setTaskLaunchCmdOpts(String cmdOpts){
this.taskLaunchCmdOpts = cmdOpts;
return this;
}
/**
* Specifies an external data source for a Vertex. This is meant to be used
* when a Vertex reads Input directly from an external source </p>
*
* For vertices which read data generated by another vertex - use the
* {@link DAG addEdge} method.
*
* If a vertex needs to use data generated by another vertex in the DAG and
* also from an external source, a combination of this API and the DAG.addEdge
* API can be used. </p>
*
* Note: If more than one RootInput exists on a vertex, which generates events
* which need to be routed, or generates information to set parallelism, a
* custom vertex manager should be setup to handle this. Not using a custom
* vertex manager for such a scenario will lead to a runtime failure.
*
* @param inputName
* the name of the input. This will be used when accessing the input
* in the {@link LogicalIOProcessor}
* @param dataSourceDescriptor
* the @{link DataSourceDescriptor} for this input.
* @return this Vertex
*/
public Vertex addDataSource(String inputName, DataSourceDescriptor dataSourceDescriptor) {
Preconditions.checkArgument(StringUtils.isNotBlank(inputName),
"InputName should not be null, empty or white space only, inputName=" + inputName);
Preconditions.checkArgument(!additionalInputs.containsKey(inputName),
"Duplicated input:" + inputName + ", vertexName=" + vertexName);
additionalInputs
.put(inputName, new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>(
inputName, dataSourceDescriptor.getInputDescriptor(),
dataSourceDescriptor.getInputInitializerDescriptor()));
this.dataSources.add(dataSourceDescriptor);
return this;
}
/**
* Specifies an external data sink for a Vertex. This is meant to be used when
* a Vertex writes Output directly to an external destination. </p>
*
* If an output of the vertex is meant to be consumed by another Vertex in the
* DAG - use the {@link DAG addEdge} method.
*
* If a vertex needs generate data to an external source as well as for
* another Vertex in the DAG, a combination of this API and the DAG.addEdge
* API can be used.
*
* @param outputName
* the name of the output. This will be used when accessing the
* output in the {@link LogicalIOProcessor}
* @param dataSinkDescriptor
* the {@link DataSinkDescriptor} for this output
* @return this Vertex
*/
public Vertex addDataSink(String outputName, DataSinkDescriptor dataSinkDescriptor) {
Preconditions.checkArgument(StringUtils.isNotBlank(outputName),
"OutputName should not be null, empty or white space only, outputName=" + outputName);
Preconditions.checkArgument(!additionalOutputs.containsKey(outputName),
"Duplicated output:" + outputName + ", vertexName=" + vertexName);
additionalOutputs
.put(outputName, new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(
outputName, dataSinkDescriptor.getOutputDescriptor(),
dataSinkDescriptor.getOutputCommitterDescriptor()));
this.dataSinks.add(dataSinkDescriptor);
return this;
}
Vertex addAdditionalDataSink(RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) {
Preconditions.checkArgument(StringUtils.isNotBlank(output.getName()),
"OutputName should not be null, empty or white space only, outputName=" + output.getName());
Preconditions.checkArgument(!additionalOutputs.containsKey(output.getName()),
"Duplicated output:" + output.getName() + ", vertexName=" + vertexName);
additionalOutputs.put(output.getName(), output);
return this;
}
/**
* Specifies a {@link VertexManagerPlugin} for the vertex. This plugin can be
* used to modify the parallelism or reconfigure the vertex at runtime using
* user defined code embedded in the plugin
*
* @param vertexManagerPluginDescriptor
* @return this Vertex
*/
public Vertex setVertexManagerPlugin(
VertexManagerPluginDescriptor vertexManagerPluginDescriptor) {
this.vertexManagerPlugin = vertexManagerPluginDescriptor;
return this;
}
/**
* Get the launch command opts for tasks in this vertex
* @return launch command opts
*/
public String getTaskLaunchCmdOpts(){
return taskLaunchCmdOpts;
}
/**
* This is currently used to setup additional configuration parameters which will be available
* in the Vertex specific configuration used in the AppMaster. This API would be used for properties which
* are used by the Tez framework while executing this vertex as part of a larger DAG.
* As an example, the number of attempts for a task. </p>
*
* A vertex inherits it's Configuration from the DAG, and can override properties for this Vertex only
* using this method </p>
*
* Currently, properties which are used by the task runtime, such as the task to AM
* heartbeat interval, cannot be changed using this method. </p>
*
* Note: This API does not add any configuration to runtime components such as InputInitializers,
* OutputCommitters, Inputs and Outputs.
*
* @param property the property name
* @param value the value for the property
* @return the current DAG being constructed
*/
@InterfaceStability.Unstable
public Vertex setConf(String property, String value) {
TezConfiguration.validateProperty(property, Scope.VERTEX);
this.vertexConf.put(property, value);
return this;
}
/**
* Sets the execution context for this Vertex - i.e. the Task Scheduler, ContainerLauncher and
* TaskCommunicator to be used. Also whether the vertex will be executed within the AM.
* If partially specified, the default components in Tez will be used - which may or may not work
* with the custom context.
*
* @param vertexExecutionContext the execution context for the vertex.
*
* @return this Vertex
*/
public Vertex setExecutionContext(VertexExecutionContext vertexExecutionContext) {
this.vertexExecutionContext = vertexExecutionContext;
return this;
}
/**
* The execution context for a running vertex.
*/
@Public
@InterfaceStability.Unstable
public static class VertexExecutionContext {
final boolean executeInAm;
final boolean executeInContainers;
final String taskSchedulerName;
final String containerLauncherName;
final String taskCommName;
/**
* Create an execution context which specifies whether the vertex needs to be executed in the
* AM
*
* @param executeInAm whether to execute the vertex in the AM
* @return the relevant execution context
*/
public static VertexExecutionContext createExecuteInAm(boolean executeInAm) {
return new VertexExecutionContext(executeInAm, false);
}
/**
* Create an execution context which specifies whether the vertex needs to be executed in
* regular containers
*
* @param executeInContainers whether to execute the vertex in regular containers
* @return the relevant execution context
*/
public static VertexExecutionContext createExecuteInContainers(boolean executeInContainers) {
return new VertexExecutionContext(false, executeInContainers);
}
/**
* @param taskSchedulerName the task scheduler name which was setup while creating the
* {@link org.apache.tez.client.TezClient}
* @param containerLauncherName the container launcher name which was setup while creating the
* {@link org.apache.tez.client.TezClient}
* @param taskCommName the task communicator name which was setup while creating the
* {@link org.apache.tez.client.TezClient}
* @return the relevant execution context
*/
public static VertexExecutionContext create(String taskSchedulerName,
String containerLauncherName,
String taskCommName) {
return new VertexExecutionContext(taskSchedulerName, containerLauncherName, taskCommName);
}
private VertexExecutionContext(boolean executeInAm, boolean executeInContainers) {
this(executeInAm, executeInContainers, null, null, null);
}
private VertexExecutionContext(String taskSchedulerName, String containerLauncherName,
String taskCommName) {
this(false, false, taskSchedulerName, containerLauncherName, taskCommName);
}
private VertexExecutionContext(boolean executeInAm, boolean executeInContainers,
String taskSchedulerName, String containerLauncherName,
String taskCommName) {
if (executeInAm || executeInContainers) {
Preconditions.checkState(!(executeInAm && executeInContainers),
"executeInContainers and executeInAM are mutually exclusive");
Preconditions.checkState(
taskSchedulerName == null && containerLauncherName == null && taskCommName == null,
"Uber (in-AM) or container execution cannot be enabled with a custom plugins. TaskScheduler=" +
taskSchedulerName + ", ContainerLauncher=" + containerLauncherName +
", TaskCommunicator=" + taskCommName);
}
if (taskSchedulerName != null || containerLauncherName != null || taskCommName != null) {
Preconditions.checkState(executeInAm == false && executeInContainers == false,
"Uber (in-AM) and container execution cannot be enabled with a custom plugins. TaskScheduler=" +
taskSchedulerName + ", ContainerLauncher=" + containerLauncherName +
", TaskCommunicator=" + taskCommName);
}
this.executeInAm = executeInAm;
this.executeInContainers = executeInContainers;
this.taskSchedulerName = taskSchedulerName;
this.containerLauncherName = containerLauncherName;
this.taskCommName = taskCommName;
}
public boolean shouldExecuteInAm() {
return executeInAm;
}
public boolean shouldExecuteInContainers() {
return executeInContainers;
}
public String getTaskSchedulerName() {
return taskSchedulerName;
}
public String getContainerLauncherName() {
return containerLauncherName;
}
public String getTaskCommName() {
return taskCommName;
}
@Override
public String toString() {
return "VertexExecutionContext{" +
"executeInAm=" + executeInAm +
", executeInContainers=" + executeInContainers +
", taskSchedulerName='" + taskSchedulerName + '\'' +
", containerLauncherName='" + containerLauncherName + '\'' +
", taskCommName='" + taskCommName + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
VertexExecutionContext that = (VertexExecutionContext) o;
if (executeInAm != that.executeInAm) {
return false;
}
if (executeInContainers != that.executeInContainers) {
return false;
}
if (taskSchedulerName != null ? !taskSchedulerName.equals(that.taskSchedulerName) :
that.taskSchedulerName != null) {
return false;
}
if (containerLauncherName != null ?
!containerLauncherName.equals(that.containerLauncherName) :
that.containerLauncherName != null) {
return false;
}
return !(taskCommName != null ? !taskCommName.equals(that.taskCommName) :
that.taskCommName != null);
}
@Override
public int hashCode() {
int result = (executeInAm ? 1 : 0);
result = 31 * result + (executeInContainers ? 1 : 0);
result = 31 * result + (taskSchedulerName != null ? taskSchedulerName.hashCode() : 0);
result = 31 * result + (containerLauncherName != null ? containerLauncherName.hashCode() : 0);
result = 31 * result + (taskCommName != null ? taskCommName.hashCode() : 0);
return result;
}
}
@Override
public String toString() {
return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]";
}
VertexManagerPluginDescriptor getVertexManagerPlugin() {
return vertexManagerPlugin;
}
Map<String, GroupInfo> getGroupInputs() {
return groupInputs;
}
void addGroupInput(String groupName, GroupInfo groupInputInfo) {
if (groupInputs.put(groupName, groupInputInfo) != null) {
throw new IllegalStateException(
"Vertex: " + getName() +
" already has group input with name:" + groupName);
}
}
void addInputVertex(Vertex inputVertex, Edge edge) {
inputVertices.add(inputVertex);
inputEdges.add(edge);
}
void addOutputVertex(Vertex outputVertex, Edge edge) {
outputVertices.add(outputVertex);
outputEdges.add(edge);
}
/**
* Get the input vertices for this vertex
* @return List of input vertices
*/
public List<Vertex> getInputVertices() {
return Collections.unmodifiableList(inputVertices);
}
/**
* Get the output vertices for this vertex
* @return List of output vertices
*/
public List<Vertex> getOutputVertices() {
return Collections.unmodifiableList(outputVertices);
}
/**
* Set the cpu/memory etc resources used by tasks of this vertex
* @param resource {@link Resource} for the tasks of this vertex
*/
void setTaskResource(Resource resource) {
this.taskResource = resource;
}
@Private
public List<DataSourceDescriptor> getDataSources() {
return dataSources;
}
@Private
public List<DataSinkDescriptor> getDataSinks() {
return dataSinks;
}
@Private
public VertexExecutionContext getVertexExecutionContext() {
return this.vertexExecutionContext;
}
List<Edge> getInputEdges() {
return inputEdges;
}
List<Edge> getOutputEdges() {
return outputEdges;
}
List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> getInputs() {
return Lists.newArrayList(additionalInputs.values());
}
List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> getOutputs() {
return Lists.newArrayList(additionalOutputs.values());
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((vertexName == null) ? 0 : vertexName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Vertex other = (Vertex) obj;
if (vertexName == null) {
if (other.vertexName != null)
return false;
} else if (!vertexName.equals(other.vertexName))
return false;
return true;
}
}