| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tez.dag.api; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Deque; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.Stack; |
| import java.util.Objects; |
| |
| import org.apache.commons.collections4.BidiMap; |
| import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.tez.client.CallerContext; |
| import org.apache.tez.common.JavaOptsChecker; |
| import org.apache.tez.common.TezUtils; |
| import org.apache.tez.dag.api.Vertex.VertexExecutionContext; |
| import org.apache.tez.dag.api.records.DAGProtos; |
| import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceAudience.Public; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.tez.client.TezClientUtils; |
| import org.apache.tez.common.security.DAGAccessControls; |
| import org.apache.tez.common.TezCommonUtils; |
| import org.apache.tez.common.TezYARNUtils; |
| import org.apache.tez.dag.api.EdgeProperty.DataMovementType; |
| import org.apache.tez.dag.api.EdgeProperty.DataSourceType; |
| import org.apache.tez.dag.api.EdgeProperty.SchedulingType; |
| import org.apache.tez.dag.api.VertexGroup.GroupInfo; |
| import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; |
| import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; |
| import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo; |
| import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType; |
| import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.tez.common.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * Top level entity that defines the DAG (Directed Acyclic Graph) representing |
| * the data flow graph. Consists of a set of Vertices and Edges connecting the |
| * vertices. Vertices represent transformations of data and edges represent |
| * movement of data between vertices. |
| */ |
| @Public |
| public class DAG { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(DAG.class); |
| |
| final BidiMap<String, Vertex> vertices = |
| new DualLinkedHashBidiMap<String, Vertex>(); |
| final Set<Edge> edges = Sets.newHashSet(); |
| final String name; |
| final Collection<URI> urisForCredentials = new HashSet<URI>(); |
| Credentials credentials = new Credentials(); |
| Set<VertexGroup> vertexGroups = Sets.newHashSet(); |
| |
| Set<GroupInputEdge> groupInputEdges = Sets.newHashSet(); |
| |
| private DAGAccessControls dagAccessControls; |
| Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap(); |
| String dagInfo; |
| CallerContext callerContext; |
| private Map<String,String> dagConf = new HashMap<String, String>(); |
| private VertexExecutionContext defaultExecutionContext; |
| |
| private DAG(String name) { |
| this.name = name; |
| } |
| |
| /** |
| * Create a DAG with the specified name. |
| * @param name the name of the DAG |
| * @return this {@link DAG} |
| */ |
| public static DAG create(String name) { |
| return new DAG(name); |
| } |
| |
| /** |
| * Set the files etc that must be provided to the tasks of this DAG |
| * @param localFiles |
| * files that must be available locally for each task. These files |
| * may be regular files, archives etc. as specified by the value |
| * elements of the map. |
| * @return {@link DAG} |
| */ |
| public synchronized DAG addTaskLocalFiles(Map<String, LocalResource> localFiles) { |
| Objects.requireNonNull(localFiles); |
| TezCommonUtils.addAdditionalLocalResources(localFiles, commonTaskLocalFiles, "DAG " + getName()); |
| return this; |
| } |
| |
| public synchronized DAG addVertex(Vertex vertex) { |
| if (vertices.containsKey(vertex.getName())) { |
| throw new IllegalStateException( |
| "Vertex " + vertex.getName() + " already defined!"); |
| } |
| vertices.put(vertex.getName(), vertex); |
| return this; |
| } |
| |
| public synchronized Vertex getVertex(String vertexName) { |
| return vertices.get(vertexName); |
| } |
| |
| /** |
| * One of the methods that can be used to provide information about required |
| * Credentials when running on a secure cluster. A combination of this and |
| * addURIsForCredentials should be used to specify information about all |
| * credentials required by a DAG. AM specific credentials are not used when |
| * executing a DAG. |
| * |
| * Set credentials which will be required to run this dag. This method can be |
| * used if the client has already obtained some or all of the required |
| * credentials. |
| * |
| * @param credentials Credentials for the DAG |
| * @return {@link DAG} |
| */ |
| public synchronized DAG setCredentials(Credentials credentials) { |
| this.credentials = credentials; |
| return this; |
| } |
| |
| /** |
| * Set description info for this DAG that can be used for visualization purposes. |
| * @param dagInfo JSON blob as a serialized string. |
| * Recognized keys by the UI are: |
| * "context" - The application context in which this DAG is being used. |
| * For example, this could be set to "Hive" or "Pig" if |
| * this is being run as part of a Hive or Pig script. |
| * "description" - General description on what this DAG is going to do. |
| * In the case of Hive, this could be the SQL query text. |
| * @return {@link DAG} |
| */ |
| @Deprecated |
| public synchronized DAG setDAGInfo(String dagInfo) { |
| Objects.requireNonNull(dagInfo); |
| this.dagInfo = dagInfo; |
| return this; |
| } |
| |
| |
| /** |
| * Set the Context in which Tez is being called. |
| * @param callerContext Caller Context |
| * @return {@link DAG} |
| */ |
| public synchronized DAG setCallerContext(CallerContext callerContext) { |
| Objects.requireNonNull(callerContext); |
| this.callerContext = callerContext; |
| return this; |
| } |
| |
| /** |
| * Create a group of vertices that share a common output. This can be used to implement |
| * unions efficiently. |
| * @param name Name of the group. |
| * @param members {@link Vertex} members of the group |
| * @return {@link DAG} |
| */ |
| public synchronized VertexGroup createVertexGroup(String name, Vertex... members) { |
| // vertex group name should be unique. |
| VertexGroup uv = new VertexGroup(name, members); |
| if (!vertexGroups.add(uv)) { |
| throw new IllegalStateException( |
| "VertexGroup " + name + " already defined!"); |
| } |
| |
| return uv; |
| } |
| |
| @Private |
| public synchronized Credentials getCredentials() { |
| return this.credentials; |
| } |
| |
| |
| /** |
| * Set Access controls for the DAG. Which user/groups can view the DAG progess/history and |
| * who can modify the DAG i.e. kill the DAG. |
| * The owner of the Tez Session and the user submitting the DAG are super-users and have access |
| * to all operations on the DAG. |
| * @param accessControls Access Controls |
| * @return {@link DAG} |
| */ |
| public synchronized DAG setAccessControls(DAGAccessControls accessControls) { |
| this.dagAccessControls = accessControls; |
| return this; |
| } |
| |
| @Private |
| public synchronized DAGAccessControls getDagAccessControls() { |
| return dagAccessControls; |
| } |
| |
| /** |
| * One of the methods that can be used to provide information about required |
| * Credentials when running on a secure cluster. A combination of this and |
| * setCredentials should be used to specify information about all credentials |
| * required by a DAG. AM specific credentials are not used when executing a |
| * DAG. |
| * |
| * This method can be used to specify a list of URIs for which Credentials |
| * need to be obtained so that the job can run. An incremental list of URIs |
| * can be provided by making multiple calls to the method. |
| * |
| * Currently, @{link credentials} can only be fetched for HDFS and other |
| * {@link org.apache.hadoop.fs.FileSystem} implementations that support |
| * credentials. |
| * |
| * @param uris |
| * a list of {@link URI}s |
| * @return {@link DAG} |
| */ |
| public synchronized DAG addURIsForCredentials(Collection<URI> uris) { |
| Objects.requireNonNull(uris, "URIs cannot be null"); |
| urisForCredentials.addAll(uris); |
| return this; |
| } |
| |
| /** |
| * |
| * @return an unmodifiable list representing the URIs for which credentials |
| * are required. |
| */ |
| @Private |
| public synchronized Collection<URI> getURIsForCredentials() { |
| return Collections.unmodifiableCollection(urisForCredentials); |
| } |
| |
| @Private |
| public synchronized Set<Vertex> getVertices() { |
| return Collections.unmodifiableSet(this.vertices.values()); |
| } |
| |
| /** |
| * Add an {@link Edge} connecting vertices in the DAG |
| * @param edge The edge to be added |
| * @return {@link DAG} |
| */ |
| public synchronized DAG addEdge(Edge edge) { |
| // Sanity checks |
| if (!vertices.containsValue(edge.getInputVertex())) { |
| throw new IllegalArgumentException( |
| "Input vertex " + edge.getInputVertex() + " doesn't exist!"); |
| } |
| if (!vertices.containsValue(edge.getOutputVertex())) { |
| throw new IllegalArgumentException( |
| "Output vertex " + edge.getOutputVertex() + " doesn't exist!"); |
| } |
| if (edges.contains(edge)) { |
| throw new IllegalArgumentException( |
| "Edge " + edge + " already defined!"); |
| } |
| |
| // inform the vertices |
| edge.getInputVertex().addOutputVertex(edge.getOutputVertex(), edge); |
| edge.getOutputVertex().addInputVertex(edge.getInputVertex(), edge); |
| |
| edges.add(edge); |
| return this; |
| } |
| |
| /** |
| * Add a {@link GroupInputEdge} to the DAG. |
| * @param edge {@link GroupInputEdge} |
| * @return {@link DAG} |
| */ |
| public synchronized DAG addEdge(GroupInputEdge edge) { |
| // Sanity checks |
| if (!vertexGroups.contains(edge.getInputVertexGroup())) { |
| throw new IllegalArgumentException( |
| "Input vertex " + edge.getInputVertexGroup() + " doesn't exist!"); |
| } |
| if (!vertices.containsValue(edge.getOutputVertex())) { |
| throw new IllegalArgumentException( |
| "Output vertex " + edge.getOutputVertex() + " doesn't exist!"); |
| } |
| if (groupInputEdges.contains(edge)) { |
| throw new IllegalArgumentException( |
| "GroupInputEdge " + edge + " already defined!"); |
| } |
| |
| VertexGroup av = edge.getInputVertexGroup(); |
| av.addOutputVertex(edge.getOutputVertex(), edge); |
| groupInputEdges.add(edge); |
| |
| // add new edge between members of VertexGroup and destVertex of the GroupInputEdge |
| List<Edge> newEdges = Lists.newLinkedList(); |
| Vertex dstVertex = edge.getOutputVertex(); |
| VertexGroup uv = edge.getInputVertexGroup(); |
| for (Vertex member : uv.getMembers()) { |
| newEdges.add(Edge.create(member, dstVertex, edge.getEdgeProperty())); |
| } |
| dstVertex.addGroupInput(uv.getGroupName(), uv.getGroupInfo()); |
| |
| for (Edge e : newEdges) { |
| addEdge(e); |
| } |
| |
| return this; |
| } |
| |
| /** |
| * Get the DAG name |
| * @return DAG name |
| */ |
| public String getName() { |
| return this.name; |
| } |
| |
| /** |
| * This is currently used to setup additional configuration parameters which will be available |
| * in the DAG configuration used in the AppMaster. This API would be used for properties which |
| * are used by the Tez framework while executing the DAG. As an example, the number of attempts |
| * for a task.</p> |
| * |
| * A DAG inherits it's base properties from the ApplicationMaster within which it's running. This |
| * method allows for these properties to be overridden. |
| * |
| * Currently, properties which are used by the task runtime, such as the task to AM |
| * heartbeat interval, cannot be changed using this method. </p> |
| * |
| * Note: This API does not add any configuration to runtime components such as InputInitializers, |
| * OutputCommitters, Inputs and Outputs. |
| * |
| * @param property the property name |
| * @param value the value for the property |
| * @return the current DAG being constructed |
| */ |
| @InterfaceStability.Unstable |
| public DAG setConf(String property, String value) { |
| TezConfiguration.validateProperty(property, Scope.DAG); |
| dagConf.put(property, value); |
| return this; |
| } |
| |
| /** |
| * Set history log level for this DAG. This config overrides the default or one set at the session |
| * level. |
| * |
| * @param historyLogLevel The ATS history log level for this DAG. |
| * |
| * @return this DAG |
| */ |
| public DAG setHistoryLogLevel(HistoryLogLevel historyLogLevel) { |
| return this.setConf(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL, historyLogLevel.name()); |
| } |
| |
| /** |
| * Sets the default execution context for the DAG. This can be overridden at a per Vertex level. |
| * See {@link org.apache.tez.dag.api.Vertex#setExecutionContext(VertexExecutionContext)} |
| * |
| * @param vertexExecutionContext the default execution context for the DAG |
| * |
| * @return this DAG |
| */ |
| @Public |
| @InterfaceStability.Unstable |
| public synchronized DAG setExecutionContext(VertexExecutionContext vertexExecutionContext) { |
| this.defaultExecutionContext = vertexExecutionContext; |
| return this; |
| } |
| |
| @Private |
| VertexExecutionContext getDefaultExecutionContext() { |
| return this.defaultExecutionContext; |
| } |
| |
| @Private |
| @VisibleForTesting |
| public Map<String,String> getDagConf() { |
| return dagConf; |
| } |
| |
| @Private |
| public Map<String, LocalResource> getTaskLocalFiles() { |
| return commonTaskLocalFiles; |
| } |
| |
| @Private |
| @VisibleForTesting |
| void checkAndInferOneToOneParallelism() { |
| // infer all 1-1 via dependencies |
| // collect all 1-1 edges where the source parallelism is set |
| Set<Vertex> newKnownTasksVertices = Sets.newHashSet(); |
| for (Vertex vertex : vertices.values()) { |
| if (vertex.getParallelism() > -1) { |
| newKnownTasksVertices.add(vertex); |
| } |
| } |
| |
| // walk through all known source 1-1 edges and infer parallelism |
| // add newly inferred vertices for consideration as known sources |
| // the outer loop will run for every new level of inferring the parallelism |
| // however, the entire logic will process each vertex only once |
| while(!newKnownTasksVertices.isEmpty()) { |
| Set<Vertex> knownTasksVertices = Sets.newHashSet(newKnownTasksVertices); |
| newKnownTasksVertices.clear(); |
| for (Vertex v : knownTasksVertices) { |
| for (Edge e : v.getOutputEdges()) { |
| if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) { |
| Vertex outVertex = e.getOutputVertex(); |
| if (outVertex.getParallelism() == -1) { |
| LOG.info("Inferring parallelism for vertex: " |
| + outVertex.getName() + " to be " + v.getParallelism() |
| + " from 1-1 connection with vertex " + v.getName()); |
| outVertex.setParallelism(v.getParallelism()); |
| newKnownTasksVertices.add(outVertex); |
| } |
| } |
| } |
| } |
| } |
| |
| // check for inconsistency and errors |
| for (Edge e : edges) { |
| Vertex inputVertex = e.getInputVertex(); |
| Vertex outputVertex = e.getOutputVertex(); |
| |
| if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) { |
| if (inputVertex.getParallelism() != outputVertex.getParallelism()) { |
| // both should be equal or equal to -1. |
| if (outputVertex.getParallelism() != -1) { |
| throw new TezUncheckedException( |
| "1-1 Edge. Destination vertex parallelism must match source vertex. " |
| + "Vertex: " + inputVertex.getName() + " does not match vertex: " |
| + outputVertex.getName()); |
| } |
| } |
| } |
| } |
| |
| // check the vertices with -1 parallelism, currently only 3 cases are allowed to has -1 parallelism. |
| // It is OK not using topological order to check vertices here. |
| // 1. has input initializers |
| // 2. 1-1 uninited sources |
| // 3. has custom vertex manager |
| for (Vertex vertex : vertices.values()) { |
| if (vertex.getParallelism() == -1) { |
| boolean hasInputInitializer = false; |
| if (vertex.getDataSources() != null && !vertex.getDataSources().isEmpty()) { |
| for (DataSourceDescriptor ds : vertex.getDataSources()) { |
| if (ds.getInputInitializerDescriptor() != null) { |
| hasInputInitializer = true; |
| break; |
| } |
| } |
| } |
| if (hasInputInitializer) { |
| continue; |
| } else { |
| // Account for the case where the vertex has a data source with a determined number of |
| // shards e.g. splits calculated on the client and not in the AM |
| // In this case, vertex parallelism is setup later using the data source's numShards |
| // and as a result, an initializer is not needed. |
| if (vertex.getDataSources() != null |
| && vertex.getDataSources().size() == 1 |
| && vertex.getDataSources().get(0).getNumberOfShards() > -1) { |
| continue; |
| } |
| } |
| |
| boolean has1to1UninitedSources = false; |
| if (vertex.getInputVertices()!= null && !vertex.getInputVertices().isEmpty()) { |
| for (Vertex srcVertex : vertex.getInputVertices()) { |
| if (srcVertex.getParallelism() == -1) { |
| has1to1UninitedSources = true; |
| break; |
| } |
| } |
| } |
| if (has1to1UninitedSources) { |
| continue; |
| } |
| |
| if (vertex.getVertexManagerPlugin() != null) { |
| continue; |
| } |
| throw new IllegalStateException(vertex.getName() + |
| " has -1 tasks but does not have input initializers, " + |
| "1-1 uninited sources or custom vertex manager to set it at runtime"); |
| } |
| } |
| } |
| |
| // AnnotatedVertex is used by verify() |
| private static class AnnotatedVertex { |
| Vertex v; |
| |
| int index; //for Tarjan's algorithm |
| int lowlink; //for Tarjan's algorithm |
| boolean onstack; //for Tarjan's algorithm |
| |
| |
| private AnnotatedVertex(Vertex v) { |
| this.v = v; |
| index = -1; |
| lowlink = -1; |
| } |
| } |
| |
| // verify() |
| // |
| // Default rules |
| // Illegal: |
| // - duplicate vertex id |
| // - cycles |
| // |
| // Ok: |
| // - orphaned vertex. Occurs in map-only |
| // - islands. Occurs if job has unrelated workflows. |
| // |
| // Not yet categorized: |
| // - orphaned vertex in DAG of >1 vertex. Could be unrelated map-only job. |
| // - v1->v2 via two edges. perhaps some self-join job would use this? |
| // |
| // "restricted" mode: |
| // In short term, the supported DAGs are limited. Call with restricted=true for these verifications. |
| // Illegal: |
| // - any vertex with more than one input or output edge. (n-ary input, n-ary merge) |
| @VisibleForTesting |
| void verify() throws IllegalStateException { |
| verify(true); |
| } |
| |
| @VisibleForTesting |
| Deque<String> verify(boolean restricted) throws IllegalStateException { |
| if (vertices.isEmpty()) { |
| throw new IllegalStateException("Invalid dag containing 0 vertices"); |
| } |
| |
| // check for valid vertices, duplicate vertex names, |
| // and prepare for cycle detection |
| Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>(); |
| Map<Vertex, Set<String>> inboundVertexMap = new HashMap<Vertex, Set<String>>(); |
| Map<Vertex, Set<String>> outboundVertexMap = new HashMap<Vertex, Set<String>>(); |
| for (Vertex v : vertices.values()) { |
| if (vertexMap.containsKey(v.getName())) { |
| throw new IllegalStateException("DAG contains multiple vertices" |
| + " with name: " + v.getName()); |
| } |
| vertexMap.put(v.getName(), new AnnotatedVertex(v)); |
| } |
| |
| Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>(); |
| for (Edge e : edges) { |
| // Construct structure for cycle detection |
| Vertex inputVertex = e.getInputVertex(); |
| Vertex outputVertex = e.getOutputVertex(); |
| List<Edge> edgeList = edgeMap.get(inputVertex); |
| if (edgeList == null) { |
| edgeList = new ArrayList<Edge>(); |
| edgeMap.put(inputVertex, edgeList); |
| } |
| edgeList.add(e); |
| |
| // Construct map for Input name verification |
| Set<String> inboundSet = inboundVertexMap.get(outputVertex); |
| if (inboundSet == null) { |
| inboundSet = new HashSet<String>(); |
| inboundVertexMap.put(outputVertex, inboundSet); |
| } |
| inboundSet.add(inputVertex.getName()); |
| |
| // Construct map for Output name verification |
| Set<String> outboundSet = outboundVertexMap.get(inputVertex); |
| if (outboundSet == null) { |
| outboundSet = new HashSet<String>(); |
| outboundVertexMap.put(inputVertex, outboundSet); |
| } |
| outboundSet.add(outputVertex.getName()); |
| } |
| |
| // check input and output names don't collide with vertex names |
| for (Vertex vertex : vertices.values()) { |
| for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> |
| input : vertex.getInputs()) { |
| if (vertexMap.containsKey(input.getName())) { |
| throw new IllegalStateException("Vertex: " |
| + vertex.getName() |
| + " contains an Input with the same name as vertex: " |
| + input.getName()); |
| } |
| } |
| for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> |
| output : vertex.getOutputs()) { |
| if (vertexMap.containsKey(output.getName())) { |
| throw new IllegalStateException("Vertex: " |
| + vertex.getName() |
| + " contains an Output with the same name as vertex: " |
| + output.getName()); |
| } |
| } |
| } |
| |
| // Check for valid InputNames |
| for (Entry<Vertex, Set<String>> entry : inboundVertexMap.entrySet()) { |
| Vertex vertex = entry.getKey(); |
| for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> |
| input : vertex.getInputs()) { |
| if (entry.getValue().contains(input.getName())) { |
| throw new IllegalStateException("Vertex: " |
| + vertex.getName() |
| + " contains an incoming vertex and Input with the same name: " |
| + input.getName()); |
| } |
| } |
| } |
| |
| // Check for valid OutputNames |
| for (Entry<Vertex, Set<String>> entry : outboundVertexMap.entrySet()) { |
| Vertex vertex = entry.getKey(); |
| for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> |
| output : vertex.getOutputs()) { |
| if (entry.getValue().contains(output.getName())) { |
| throw new IllegalStateException("Vertex: " |
| + vertex.getName() |
| + " contains an outgoing vertex and Output with the same name: " |
| + output.getName()); |
| } |
| } |
| } |
| |
| |
| // Not checking for repeated input names / output names vertex names on the same vertex, |
| // since we only allow 1 at the moment. |
| // When additional inputs are supported, this can be chceked easily (and early) |
| // within the addInput / addOutput call itself. |
| |
| Deque<String> topologicalVertexStack = detectCycles(edgeMap, vertexMap); |
| |
| checkAndInferOneToOneParallelism(); |
| |
| if (restricted) { |
| for (Edge e : edges) { |
| DataSourceType dataSourceType = e.getEdgeProperty().getDataSourceType(); |
| if (dataSourceType != DataSourceType.PERSISTED && |
| dataSourceType != DataSourceType.EPHEMERAL) { |
| throw new IllegalStateException( |
| "Unsupported source type on edge. " + e); |
| } |
| } |
| } |
| |
| // check for conflicts between dag level local resource and vertex level local resource |
| |
| |
| return topologicalVertexStack; |
| } |
| |
| @VisibleForTesting |
| void verifyLocalResources(Configuration tezConf) { |
| for (Vertex v : vertices.values()) { |
| for (Map.Entry<String, LocalResource> localResource : v |
| .getTaskLocalFiles().entrySet()) { |
| String resourceName = localResource.getKey(); |
| LocalResource resource = localResource.getValue(); |
| if (commonTaskLocalFiles.containsKey(resourceName) |
| && !commonTaskLocalFiles.get(resourceName).equals(resource)) { |
| // Different for some reason. Compare size, and then eventually hash |
| try { |
| |
| LocalResource commonLr = commonTaskLocalFiles.get(resourceName); |
| if (resource.getSize() != commonLr.getSize()) { |
| throw new IllegalStateException( |
| "There is conflicting local resource (size mismatch) (" + |
| resourceName |
| + ") between dag local resource and vertex " + |
| v.getName() + " local resource. " |
| + "\nResource of dag : " + |
| commonTaskLocalFiles.get(resourceName) |
| + "\nResource of vertex: " + resource); |
| } |
| |
| Path vertexResourcePath = |
| ConverterUtils.getPathFromYarnURL(resource.getResource()); |
| Path commonResourcePath = ConverterUtils.getPathFromYarnURL( |
| commonLr.getResource()); |
| |
| byte[] vertexResourceSha = TezClientUtils |
| .getResourceSha(vertexResourcePath.toUri(), tezConf); |
| byte[] commonResourceSha = TezClientUtils |
| .getResourceSha(commonResourcePath.toUri(), tezConf); |
| |
| if (!Arrays.equals(vertexResourceSha, commonResourceSha)) { |
| throw new IllegalStateException( |
| "There is conflicting local resource (sha mismatch) (" + |
| resourceName |
| + ") between dag local resource and vertex " + |
| v.getName() + " local resource. " |
| + "\nResource of dag : " + |
| commonTaskLocalFiles.get(resourceName) |
| + "\nResource of vertex: " + resource); |
| } |
| |
| } catch (URISyntaxException | IOException e) { |
| throw new RuntimeException( |
| "Failed while attempting to validate sha for conflicting resources (" + |
| resourceName |
| + ") between dag local resource and vertex " + v.getName() + |
| " local resource. " |
| + "\nResource of dag : " + |
| commonTaskLocalFiles.get(resourceName) |
| + "\nResource of vertex: " + resource); |
| } |
| } |
| } |
| } |
| } |
| |
| // Adaptation of Tarjan's algorithm for connected components. |
| // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm |
| private Deque<String> detectCycles(Map<Vertex, List<Edge>> edgeMap, |
| Map<String, AnnotatedVertex> vertexMap) |
| throws IllegalStateException { |
| Deque<String> topologicalVertexStack = new LinkedList<String>(); |
| Integer nextIndex = 0; // boxed integer so it is passed by reference. |
| Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>(); |
| for (AnnotatedVertex av : vertexMap.values()) { |
| if (av.index == -1) { |
| assert stack.empty(); |
| strongConnect(av, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack); |
| } |
| } |
| return topologicalVertexStack; |
| } |
| |
| // part of Tarjan's algorithm for connected components. |
| // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm |
| private void strongConnect( |
| AnnotatedVertex av, |
| Map<String, AnnotatedVertex> vertexMap, |
| Map<Vertex, List<Edge>> edgeMap, |
| Stack<AnnotatedVertex> stack, |
| Integer nextIndex, |
| Deque<String> topologicalVertexStack) throws IllegalStateException { |
| av.index = nextIndex; |
| av.lowlink = nextIndex; |
| nextIndex++; |
| stack.push(av); |
| av.onstack = true; |
| |
| List<Edge> edges = edgeMap.get(av.v); |
| if (edges != null) { |
| for (Edge e : edgeMap.get(av.v)) { |
| AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getName()); |
| if (outVertex.index == -1) { |
| strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack); |
| av.lowlink = Math.min(av.lowlink, outVertex.lowlink); |
| } else if (outVertex.onstack) { |
| // strongly connected component detected, but we will wait till later so that the full cycle can be displayed. |
| // update lowlink in case outputVertex should be considered the root of this component. |
| av.lowlink = Math.min(av.lowlink, outVertex.index); |
| } |
| } |
| } |
| |
| if (av.lowlink == av.index) { |
| AnnotatedVertex pop = stack.pop(); |
| pop.onstack = false; |
| if (pop != av) { |
| // there was something on the stack other than this "av". |
| // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av" |
| StringBuilder message = new StringBuilder(); |
| message.append(av.v.getName()).append(" <- "); |
| for (; pop != av; pop = stack.pop()) { |
| message.append(pop.v.getName()).append(" <- "); |
| pop.onstack = false; |
| } |
| message.append(av.v.getName()); |
| throw new IllegalStateException("DAG contains a cycle: " + message); |
| } else { |
| // detect self-cycle |
| if (edgeMap.containsKey(pop.v)) { |
| for (Edge edge : edgeMap.get(pop.v)) { |
| if (edge.getOutputVertex().equals(pop.v)) { |
| throw new IllegalStateException("DAG contains a self-cycle on vertex:" + pop.v.getName()); |
| } |
| } |
| } |
| } |
| topologicalVertexStack.push(av.v.getName()); |
| } |
| } |
| |
| // create protobuf message describing DAG |
| @Private |
| public DAGPlan createDag(Configuration tezConf, Credentials extraCredentials, |
| Map<String, LocalResource> tezJarResources, LocalResource binaryConfig, |
| boolean tezLrsAsArchive) { |
| return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive, |
| null, null); |
| } |
| |
| // create protobuf message describing DAG |
| @Private |
| public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials, |
| Map<String, LocalResource> tezJarResources, LocalResource binaryConfig, |
| boolean tezLrsAsArchive, ServicePluginsDescriptor servicePluginsDescriptor, |
| JavaOptsChecker javaOptsChecker) { |
| Deque<String> topologicalVertexStack = verify(true); |
| verifyLocalResources(tezConf); |
| |
| DAGPlan.Builder dagBuilder = DAGPlan.newBuilder(); |
| dagBuilder.setName(this.name); |
| |
| if (this.callerContext != null) { |
| dagBuilder.setCallerContext(DagTypeConverters.convertCallerContextToProto(callerContext)); |
| } |
| if (this.dagInfo != null && !this.dagInfo.isEmpty()) { |
| dagBuilder.setDagInfo(this.dagInfo); |
| } |
| |
| // Setup default execution context. |
| VertexExecutionContext defaultContext = getDefaultExecutionContext(); |
| verifyExecutionContext(defaultContext, servicePluginsDescriptor, "DAGDefault"); |
| if (defaultContext != null) { |
| DAGProtos.VertexExecutionContextProto contextProto = DagTypeConverters.convertToProto( |
| defaultContext); |
| dagBuilder.setDefaultExecutionContext(contextProto); |
| } |
| |
| if (!vertexGroups.isEmpty()) { |
| for (VertexGroup av : vertexGroups) { |
| GroupInfo groupInfo = av.getGroupInfo(); |
| PlanVertexGroupInfo.Builder groupBuilder = PlanVertexGroupInfo.newBuilder(); |
| groupBuilder.setGroupName(groupInfo.getGroupName()); |
| for (Vertex v : groupInfo.getMembers()) { |
| groupBuilder.addGroupMembers(v.getName()); |
| } |
| groupBuilder.addAllOutputs(groupInfo.outputs); |
| for (Map.Entry<String, InputDescriptor> entry : |
| groupInfo.edgeMergedInputs.entrySet()) { |
| groupBuilder.addEdgeMergedInputs( |
| PlanGroupInputEdgeInfo.newBuilder().setDestVertexName(entry.getKey()). |
| setMergedInput(DagTypeConverters.convertToDAGPlan(entry.getValue()))); |
| } |
| dagBuilder.addVertexGroups(groupBuilder); |
| } |
| } |
| |
| Credentials dagCredentials = new Credentials(); |
| if (extraCredentials != null) { |
| dagCredentials.mergeAll(extraCredentials); |
| } |
| dagCredentials.mergeAll(credentials); |
| if (!commonTaskLocalFiles.isEmpty()) { |
| dagBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(commonTaskLocalFiles)); |
| } |
| |
| Preconditions.checkArgument(topologicalVertexStack.size() == vertices.size(), |
| "size of topologicalVertexStack is:" + topologicalVertexStack.size() + |
| " while size of vertices is:" + vertices.size() + |
| ", make sure they are the same in order to sort the vertices"); |
| while(!topologicalVertexStack.isEmpty()) { |
| Vertex vertex = vertices.get(topologicalVertexStack.pop()); |
| // infer credentials, resources and parallelism from data source |
| Resource vertexTaskResource = vertex.getTaskResource(); |
| if (vertexTaskResource == null) { |
| vertexTaskResource = Resource.newInstance(tezConf.getInt( |
| TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB, |
| TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), tezConf.getInt( |
| TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES, |
| TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT)); |
| } |
| Map<String, LocalResource> vertexLRs = Maps.newHashMap(); |
| vertexLRs.putAll(vertex.getTaskLocalFiles()); |
| List<DataSourceDescriptor> dataSources = vertex.getDataSources(); |
| for (DataSourceDescriptor dataSource : dataSources) { |
| if (dataSource.getCredentials() != null) { |
| dagCredentials.addAll(dataSource.getCredentials()); |
| } |
| if (dataSource.getAdditionalLocalFiles() != null) { |
| TezCommonUtils |
| .addAdditionalLocalResources(dataSource.getAdditionalLocalFiles(), vertexLRs, |
| "Vertex " + vertex.getName()); |
| } |
| } |
| if (tezJarResources != null) { |
| TezCommonUtils |
| .addAdditionalLocalResources(tezJarResources, vertexLRs, "Vertex " + vertex.getName()); |
| } |
| if (binaryConfig != null) { |
| vertexLRs.put(TezConstants.TEZ_PB_BINARY_CONF_NAME, binaryConfig); |
| } |
| int vertexParallelism = vertex.getParallelism(); |
| VertexLocationHint vertexLocationHint = vertex.getLocationHint(); |
| if (dataSources.size() == 1) { |
| DataSourceDescriptor dataSource = dataSources.get(0); |
| if (vertexParallelism == -1 && dataSource.getNumberOfShards() > -1) { |
| vertexParallelism = dataSource.getNumberOfShards(); |
| } |
| if (vertexLocationHint == null && dataSource.getLocationHint() != null) { |
| vertexLocationHint = dataSource.getLocationHint(); |
| } |
| } |
| if (vertexParallelism == -1) { |
| Preconditions.checkState(vertexLocationHint == null, |
| "Cannot specify vertex location hint without specifying vertex parallelism. Vertex: " |
| + vertex.getName()); |
| } else if (vertexLocationHint != null) { |
| Preconditions.checkState(vertexParallelism == vertexLocationHint.getTaskLocationHints().size(), |
| "vertex task location hint must equal vertex parallelism. Vertex: " + vertex.getName()); |
| } |
| for (DataSinkDescriptor dataSink : vertex.getDataSinks()) { |
| if (dataSink.getCredentials() != null) { |
| dagCredentials.addAll(dataSink.getCredentials()); |
| } |
| } |
| |
| VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder(); |
| vertexBuilder.setName(vertex.getName()); |
| vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46. |
| vertexBuilder.setProcessorDescriptor(DagTypeConverters |
| .convertToDAGPlan(vertex.getProcessorDescriptor())); |
| |
| // Vertex ExecutionContext setup |
| VertexExecutionContext execContext = vertex.getVertexExecutionContext(); |
| verifyExecutionContext(execContext, servicePluginsDescriptor, vertex.getName()); |
| if (execContext != null) { |
| DAGProtos.VertexExecutionContextProto contextProto = |
| DagTypeConverters.convertToProto(execContext); |
| vertexBuilder.setExecutionContext(contextProto); |
| } |
| // End of VertexExecutionContext setup. |
| |
| if (vertex.getInputs().size() > 0) { |
| for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : vertex.getInputs()) { |
| vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input)); |
| } |
| } |
| if (vertex.getOutputs().size() > 0) { |
| for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output : vertex.getOutputs()) { |
| vertexBuilder.addOutputs(DagTypeConverters.convertToDAGPlan(output)); |
| } |
| } |
| |
| if (vertex.getConf()!= null && vertex.getConf().size() > 0) { |
| ConfigurationProto.Builder confBuilder = ConfigurationProto.newBuilder(); |
| TezUtils.populateConfProtoFromEntries(vertex.getConf().entrySet(), confBuilder); |
| vertexBuilder.setVertexConf(confBuilder); |
| } |
| |
| //task config |
| PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder(); |
| taskConfigBuilder.setNumTasks(vertexParallelism); |
| taskConfigBuilder.setMemoryMb(vertexTaskResource.getMemory()); |
| taskConfigBuilder.setVirtualCores(vertexTaskResource.getVirtualCores()); |
| |
| try { |
| taskConfigBuilder.setJavaOpts( |
| TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vertex.getTaskLaunchCmdOpts(), tezConf, |
| javaOptsChecker)); |
| } catch (TezException e) { |
| throw new TezUncheckedException("Invalid TaskLaunchCmdOpts defined for Vertex " |
| + vertex.getName() + " : " + e.getMessage(), e); |
| } |
| |
| taskConfigBuilder.setTaskModule(vertex.getName()); |
| if (!vertexLRs.isEmpty()) { |
| taskConfigBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(vertexLRs)); |
| } |
| |
| Map<String, String> taskEnv = Maps.newHashMap(vertex.getTaskEnvironment()); |
| TezYARNUtils.setupDefaultEnv(taskEnv, tezConf, |
| TezConfiguration.TEZ_TASK_LAUNCH_ENV, |
| TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, |
| TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_ENV, |
| TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_ENV_DEFAULT, |
| tezLrsAsArchive); |
| for (Map.Entry<String, String> entry : taskEnv.entrySet()) { |
| PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder(); |
| envSettingBuilder.setKey(entry.getKey()); |
| envSettingBuilder.setValue(entry.getValue()); |
| taskConfigBuilder.addEnvironmentSetting(envSettingBuilder); |
| } |
| |
| if (vertexLocationHint != null) { |
| if (vertexLocationHint.getTaskLocationHints() != null) { |
| for (TaskLocationHint hint : vertexLocationHint.getTaskLocationHints()) { |
| PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder(); |
| // we can allow this later on if needed |
| if (hint.getAffinitizedTask() != null) { |
| throw new TezUncheckedException( |
| "Task based affinity may not be specified via the DAG API"); |
| } |
| |
| if (hint.getHosts() != null) { |
| taskLocationHintBuilder.addAllHost(hint.getHosts()); |
| } |
| if (hint.getRacks() != null) { |
| taskLocationHintBuilder.addAllRack(hint.getRacks()); |
| } |
| |
| vertexBuilder.addTaskLocationHint(taskLocationHintBuilder); |
| } |
| } |
| } |
| |
| if (vertex.getVertexManagerPlugin() != null) { |
| vertexBuilder.setVertexManagerPlugin(DagTypeConverters |
| .convertToDAGPlan(vertex.getVertexManagerPlugin())); |
| } |
| |
| for (Edge inEdge : vertex.getInputEdges()) { |
| vertexBuilder.addInEdgeId(inEdge.getId()); |
| } |
| |
| for (Edge outEdge : vertex.getOutputEdges()) { |
| vertexBuilder.addOutEdgeId(outEdge.getId()); |
| } |
| |
| vertexBuilder.setTaskConfig(taskConfigBuilder); |
| dagBuilder.addVertex(vertexBuilder); |
| } |
| |
| for (Edge edge : edges) { |
| EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder(); |
| edgeBuilder.setId(edge.getId()); |
| edgeBuilder.setInputVertexName(edge.getInputVertex().getName()); |
| edgeBuilder.setOutputVertexName(edge.getOutputVertex().getName()); |
| edgeBuilder.setDataMovementType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataMovementType())); |
| edgeBuilder.setDataSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataSourceType())); |
| edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType())); |
| edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource())); |
| edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination())); |
| if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM) { |
| if (edge.getEdgeProperty().getEdgeManagerDescriptor() != null) { |
| edgeBuilder.setEdgeManager(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeManagerDescriptor())); |
| } // else the AM will deal with this. |
| } |
| dagBuilder.addEdge(edgeBuilder); |
| } |
| |
| if (dagAccessControls != null) { |
| dagBuilder.setAclInfo(DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls)); |
| } |
| |
| ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); |
| if (!this.dagConf.isEmpty()) { |
| TezUtils.populateConfProtoFromEntries(this.dagConf.entrySet(), confProtoBuilder); |
| } |
| // Copy historyLogLevel from tezConf into dagConf if its not overridden in dagConf. |
| String logLevel = this.dagConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL); |
| if (logLevel != null) { |
| // The config is from dagConf, we have already added it to the proto above, just check if |
| // the value is valid. |
| if (!HistoryLogLevel.validateLogLevel(logLevel)) { |
| throw new IllegalArgumentException( |
| "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL + |
| " is set to invalid value: " + logLevel); |
| } |
| } else { |
| // Validate and set value from tezConf. |
| logLevel = tezConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL); |
| if (logLevel != null) { |
| if (!HistoryLogLevel.validateLogLevel(logLevel)) { |
| throw new IllegalArgumentException( |
| "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL + |
| " is set to invalid value: " + logLevel); |
| } |
| PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); |
| kvp.setKey(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL); |
| kvp.setValue(logLevel); |
| confProtoBuilder.addConfKeyValues(kvp); |
| } |
| } |
| dagBuilder.setDagConf(confProtoBuilder); |
| |
| if (dagCredentials != null) { |
| dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials)); |
| TezCommonUtils.logCredentials(LOG, dagCredentials, "dag"); |
| } |
| |
| return dagBuilder.build(); |
| } |
| |
| private void verifyExecutionContext(VertexExecutionContext executionContext, |
| ServicePluginsDescriptor servicePluginsDescriptor, |
| String context) { |
| if (executionContext != null) { |
| if (executionContext.shouldExecuteInContainers()) { |
| if (servicePluginsDescriptor == null || !servicePluginsDescriptor.areContainersEnabled()) { |
| throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + |
| " specifies container execution but this is disabled in the ServicePluginDescriptor"); |
| } |
| } |
| if (executionContext.shouldExecuteInAm()) { |
| if (servicePluginsDescriptor == null || !servicePluginsDescriptor.isUberEnabled()) { |
| throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + |
| " specifies AM execution but this is disabled in the ServicePluginDescriptor"); |
| } |
| } |
| if (executionContext.getTaskSchedulerName() != null) { |
| boolean found = false; |
| if (servicePluginsDescriptor != null) { |
| found = checkNamedEntityExists(executionContext.getTaskSchedulerName(), |
| servicePluginsDescriptor.getTaskSchedulerDescriptors()); |
| } |
| if (!found) { |
| throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + |
| " specifies task scheduler as " + executionContext.getTaskSchedulerName() + |
| " which is not part of the ServicePluginDescriptor"); |
| } |
| } |
| if (executionContext.getContainerLauncherName() != null) { |
| boolean found = false; |
| if (servicePluginsDescriptor != null) { |
| found = checkNamedEntityExists(executionContext.getContainerLauncherName(), |
| servicePluginsDescriptor.getContainerLauncherDescriptors()); |
| } |
| if (!found) { |
| throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + |
| " specifies container launcher as " + executionContext.getContainerLauncherName() + |
| " which is not part of the ServicePluginDescriptor"); |
| } |
| } |
| if (executionContext.getTaskCommName() != null) { |
| boolean found = false; |
| if (servicePluginsDescriptor != null) { |
| found = checkNamedEntityExists(executionContext.getTaskCommName(), |
| servicePluginsDescriptor.getTaskCommunicatorDescriptors()); |
| } |
| if (!found) { |
| throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context + |
| " specifies task communicator as " + executionContext.getTaskCommName() + |
| " which is not part of the ServicePluginDescriptor"); |
| } |
| } |
| } |
| } |
| |
| private boolean checkNamedEntityExists(String expected, NamedEntityDescriptor[] namedEntities) { |
| if (namedEntities == null) { |
| return false; |
| } |
| for (NamedEntityDescriptor named : namedEntities) { |
| if (named.getEntityName().equals(expected)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| public synchronized CallerContext getCallerContext() { |
| return this.callerContext; |
| } |
| |
| } |