| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.runtime.jobgraph; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.operators.ResourceSpec; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.core.io.InputSplitSource; |
| import org.apache.flink.runtime.OperatorIDPair; |
| import org.apache.flink.runtime.io.network.partition.ResultPartitionType; |
| import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; |
| import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; |
| import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; |
| import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; |
| import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.SerializedValue; |
| |
| import javax.annotation.Nullable; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** The base class for job vertexes. */ |
| public class JobVertex implements java.io.Serializable { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private static final String DEFAULT_NAME = "(unnamed vertex)"; |
| |
| public static final int MAX_PARALLELISM_DEFAULT = -1; |
| |
| // -------------------------------------------------------------------------------------------- |
| // Members that define the structure / topology of the graph |
| // -------------------------------------------------------------------------------------------- |
| |
| /** The ID of the vertex. */ |
| private final JobVertexID id; |
| |
| /** |
| * The IDs of all operators contained in this vertex. |
| * |
| * <p>The ID pairs are stored depth-first post-order; for the forking chain below the ID's would |
| * be stored as [D, E, B, C, A]. |
| * |
| * <pre> |
| * A - B - D |
| * \ \ |
| * C E |
| * </pre> |
| * |
| * <p>This is the same order that operators are stored in the {@code StreamTask}. |
| */ |
| private final List<OperatorIDPair> operatorIDs; |
| |
| /** List of produced data sets, one per writer. */ |
| private final ArrayList<IntermediateDataSet> results = new ArrayList<>(); |
| |
| /** List of edges with incoming data. One per Reader. */ |
| private final ArrayList<JobEdge> inputs = new ArrayList<>(); |
| |
| /** The list of factories for operator coordinators. */ |
| private final ArrayList<SerializedValue<OperatorCoordinator.Provider>> operatorCoordinators = |
| new ArrayList<>(); |
| |
| /** Number of subtasks to split this task into at runtime. */ |
| private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; |
| |
| /** Maximum number of subtasks to split this task into a runtime. */ |
| private int maxParallelism = MAX_PARALLELISM_DEFAULT; |
| |
| /** The minimum resource of the vertex. */ |
| private ResourceSpec minResources = ResourceSpec.DEFAULT; |
| |
| /** The preferred resource of the vertex. */ |
| private ResourceSpec preferredResources = ResourceSpec.DEFAULT; |
| |
| /** Custom configuration passed to the assigned task at runtime. */ |
| private Configuration configuration; |
| |
| /** The class of the invokable. */ |
| private String invokableClassName; |
| |
| /** Indicates of this job vertex is stoppable or not. */ |
| private boolean isStoppable = false; |
| |
| /** Optionally, a source of input splits. */ |
| private InputSplitSource<?> inputSplitSource; |
| |
| /** |
| * The name of the vertex. This will be shown in runtime logs and will be in the runtime |
| * environment. |
| */ |
| private String name; |
| |
| /** |
| * Optionally, a sharing group that allows subtasks from different job vertices to run |
| * concurrently in one slot. |
| */ |
| @Nullable private SlotSharingGroup slotSharingGroup; |
| |
| /** The group inside which the vertex subtasks share slots. */ |
| @Nullable private CoLocationGroupImpl coLocationGroup; |
| |
| /** |
| * Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON |
| * plan. |
| */ |
| private String operatorName; |
| |
| /** |
| * Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce', to be |
| * included in the JSON plan. |
| */ |
| private String operatorDescription; |
| |
| /** Optional, pretty name of the operator, to be displayed in the JSON plan. */ |
| private String operatorPrettyName; |
| |
| /** |
| * Optional, the JSON for the optimizer properties of the operator result, to be included in the |
| * JSON plan. |
| */ |
| private String resultOptimizerProperties; |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Constructs a new job vertex and assigns it with the given name. |
| * |
| * @param name The name of the new job vertex. |
| */ |
| public JobVertex(String name) { |
| this(name, null); |
| } |
| |
| /** |
| * Constructs a new job vertex and assigns it with the given name. |
| * |
| * @param name The name of the new job vertex. |
| * @param id The id of the job vertex. |
| */ |
| public JobVertex(String name, JobVertexID id) { |
| this.name = name == null ? DEFAULT_NAME : name; |
| this.id = id == null ? new JobVertexID() : id; |
| OperatorIDPair operatorIDPair = |
| OperatorIDPair.generatedIDOnly(OperatorID.fromJobVertexID(this.id)); |
| this.operatorIDs = Collections.singletonList(operatorIDPair); |
| } |
| |
| /** |
| * Constructs a new job vertex and assigns it with the given name. |
| * |
| * @param name The name of the new job vertex. |
| * @param primaryId The id of the job vertex. |
| * @param operatorIDPairs The operator ID pairs of the job vertex. |
| */ |
| public JobVertex(String name, JobVertexID primaryId, List<OperatorIDPair> operatorIDPairs) { |
| this.name = name == null ? DEFAULT_NAME : name; |
| this.id = primaryId == null ? new JobVertexID() : primaryId; |
| this.operatorIDs = Collections.unmodifiableList(operatorIDPairs); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Returns the ID of this job vertex. |
| * |
| * @return The ID of this job vertex |
| */ |
| public JobVertexID getID() { |
| return this.id; |
| } |
| |
| /** |
| * Returns the name of the vertex. |
| * |
| * @return The name of the vertex. |
| */ |
| public String getName() { |
| return this.name; |
| } |
| |
| /** |
| * Sets the name of the vertex. |
| * |
| * @param name The new name. |
| */ |
| public void setName(String name) { |
| this.name = name == null ? DEFAULT_NAME : name; |
| } |
| |
| /** |
| * Returns the number of produced intermediate data sets. |
| * |
| * @return The number of produced intermediate data sets. |
| */ |
| public int getNumberOfProducedIntermediateDataSets() { |
| return this.results.size(); |
| } |
| |
| /** |
| * Returns the number of inputs. |
| * |
| * @return The number of inputs. |
| */ |
| public int getNumberOfInputs() { |
| return this.inputs.size(); |
| } |
| |
| public List<OperatorIDPair> getOperatorIDs() { |
| return operatorIDs; |
| } |
| |
| /** |
| * Returns the vertex's configuration object which can be used to pass custom settings to the |
| * task at runtime. |
| * |
| * @return the vertex's configuration object |
| */ |
| public Configuration getConfiguration() { |
| if (this.configuration == null) { |
| this.configuration = new Configuration(); |
| } |
| return this.configuration; |
| } |
| |
| public void setInvokableClass(Class<? extends AbstractInvokable> invokable) { |
| Preconditions.checkNotNull(invokable); |
| this.invokableClassName = invokable.getName(); |
| } |
| |
| /** |
| * Returns the name of the invokable class which represents the task of this vertex. |
| * |
| * @return The name of the invokable class, <code>null</code> if not set. |
| */ |
| public String getInvokableClassName() { |
| return this.invokableClassName; |
| } |
| |
| /** |
| * Returns the invokable class which represents the task of this vertex. |
| * |
| * @param cl The classloader used to resolve user-defined classes |
| * @return The invokable class, <code>null</code> if it is not set |
| */ |
| public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) { |
| if (cl == null) { |
| throw new NullPointerException("The classloader must not be null."); |
| } |
| if (invokableClassName == null) { |
| return null; |
| } |
| |
| try { |
| return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class); |
| } catch (ClassNotFoundException e) { |
| throw new RuntimeException("The user-code class could not be resolved.", e); |
| } catch (ClassCastException e) { |
| throw new RuntimeException( |
| "The user-code class is no subclass of " + AbstractInvokable.class.getName(), |
| e); |
| } |
| } |
| |
| /** |
| * Gets the parallelism of the task. |
| * |
| * @return The parallelism of the task. |
| */ |
| public int getParallelism() { |
| return parallelism; |
| } |
| |
| /** |
| * Sets the parallelism for the task. |
| * |
| * @param parallelism The parallelism for the task. |
| */ |
| public void setParallelism(int parallelism) { |
| if (parallelism < 1) { |
| throw new IllegalArgumentException("The parallelism must be at least one."); |
| } |
| this.parallelism = parallelism; |
| } |
| |
| /** |
| * Gets the maximum parallelism for the task. |
| * |
| * @return The maximum parallelism for the task. |
| */ |
| public int getMaxParallelism() { |
| return maxParallelism; |
| } |
| |
| /** |
| * Sets the maximum parallelism for the task. |
| * |
| * @param maxParallelism The maximum parallelism to be set. must be between 1 and |
| * Short.MAX_VALUE. |
| */ |
| public void setMaxParallelism(int maxParallelism) { |
| this.maxParallelism = maxParallelism; |
| } |
| |
| /** |
| * Gets the minimum resource for the task. |
| * |
| * @return The minimum resource for the task. |
| */ |
| public ResourceSpec getMinResources() { |
| return minResources; |
| } |
| |
| /** |
| * Gets the preferred resource for the task. |
| * |
| * @return The preferred resource for the task. |
| */ |
| public ResourceSpec getPreferredResources() { |
| return preferredResources; |
| } |
| |
| /** |
| * Sets the minimum and preferred resources for the task. |
| * |
| * @param minResources The minimum resource for the task. |
| * @param preferredResources The preferred resource for the task. |
| */ |
| public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) { |
| this.minResources = checkNotNull(minResources); |
| this.preferredResources = checkNotNull(preferredResources); |
| } |
| |
| public InputSplitSource<?> getInputSplitSource() { |
| return inputSplitSource; |
| } |
| |
| public void setInputSplitSource(InputSplitSource<?> inputSplitSource) { |
| this.inputSplitSource = inputSplitSource; |
| } |
| |
| public List<IntermediateDataSet> getProducedDataSets() { |
| return this.results; |
| } |
| |
| public List<JobEdge> getInputs() { |
| return this.inputs; |
| } |
| |
| public List<SerializedValue<OperatorCoordinator.Provider>> getOperatorCoordinators() { |
| return Collections.unmodifiableList(operatorCoordinators); |
| } |
| |
| public void addOperatorCoordinator( |
| SerializedValue<OperatorCoordinator.Provider> serializedCoordinatorProvider) { |
| operatorCoordinators.add(serializedCoordinatorProvider); |
| } |
| |
| /** |
| * Associates this vertex with a slot sharing group for scheduling. Different vertices in the |
| * same slot sharing group can run one subtask each in the same slot. |
| * |
| * @param grp The slot sharing group to associate the vertex with. |
| */ |
| public void setSlotSharingGroup(SlotSharingGroup grp) { |
| checkNotNull(grp); |
| |
| if (this.slotSharingGroup != null) { |
| this.slotSharingGroup.removeVertexFromGroup(this.getID()); |
| } |
| |
| grp.addVertexToGroup(this.getID()); |
| this.slotSharingGroup = grp; |
| } |
| |
| /** |
| * Gets the slot sharing group that this vertex is associated with. Different vertices in the |
| * same slot sharing group can run one subtask each in the same slot. |
| * |
| * @return The slot sharing group to associate the vertex with |
| */ |
| public SlotSharingGroup getSlotSharingGroup() { |
| if (slotSharingGroup == null) { |
| // create a new slot sharing group for this vertex if it was in no other slot sharing |
| // group. |
| // this should only happen in testing cases at the moment because production code path |
| // will |
| // always set a value to it before used |
| setSlotSharingGroup(new SlotSharingGroup()); |
| } |
| return slotSharingGroup; |
| } |
| |
| /** |
| * Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex. |
| * Strict co-location implies that the n'th subtask of this vertex will run on the same parallel |
| * computing instance (TaskManager) as the n'th subtask of the given vertex. |
| * |
| * <p>NOTE: Co-location is only possible between vertices in a slot sharing group. |
| * |
| * <p>NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That |
| * means that the respective vertex must be a (transitive) input of this vertex. |
| * |
| * @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks |
| * with. |
| * @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are |
| * not in a common slot sharing group. |
| * @see #setSlotSharingGroup(SlotSharingGroup) |
| */ |
| public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) { |
| if (this.slotSharingGroup == null |
| || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) { |
| throw new IllegalArgumentException( |
| "Strict co-location requires that both vertices are in the same slot sharing group."); |
| } |
| |
| CoLocationGroupImpl thisGroup = this.coLocationGroup; |
| CoLocationGroupImpl otherGroup = strictlyCoLocatedWith.coLocationGroup; |
| |
| if (otherGroup == null) { |
| if (thisGroup == null) { |
| CoLocationGroupImpl group = new CoLocationGroupImpl(this, strictlyCoLocatedWith); |
| this.coLocationGroup = group; |
| strictlyCoLocatedWith.coLocationGroup = group; |
| } else { |
| thisGroup.addVertex(strictlyCoLocatedWith); |
| strictlyCoLocatedWith.coLocationGroup = thisGroup; |
| } |
| } else { |
| if (thisGroup == null) { |
| otherGroup.addVertex(this); |
| this.coLocationGroup = otherGroup; |
| } else { |
| // both had yet distinct groups, we need to merge them |
| thisGroup.mergeInto(otherGroup); |
| } |
| } |
| } |
| |
| @Nullable |
| public CoLocationGroup getCoLocationGroup() { |
| return coLocationGroup; |
| } |
| |
| public void updateCoLocationGroup(CoLocationGroupImpl group) { |
| this.coLocationGroup = group; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| public IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) { |
| return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType); |
| } |
| |
| public IntermediateDataSet createAndAddResultDataSet( |
| IntermediateDataSetID id, ResultPartitionType partitionType) { |
| |
| IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this); |
| this.results.add(result); |
| return result; |
| } |
| |
| public JobEdge connectDataSetAsInput( |
| IntermediateDataSet dataSet, DistributionPattern distPattern) { |
| JobEdge edge = new JobEdge(dataSet, this, distPattern); |
| this.inputs.add(edge); |
| dataSet.addConsumer(edge); |
| return edge; |
| } |
| |
| public JobEdge connectNewDataSetAsInput( |
| JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) { |
| |
| IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); |
| |
| JobEdge edge = new JobEdge(dataSet, this, distPattern); |
| this.inputs.add(edge); |
| dataSet.addConsumer(edge); |
| return edge; |
| } |
| |
| public void connectIdInput(IntermediateDataSetID dataSetId, DistributionPattern distPattern) { |
| JobEdge edge = new JobEdge(dataSetId, this, distPattern); |
| this.inputs.add(edge); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| public boolean isInputVertex() { |
| return this.inputs.isEmpty(); |
| } |
| |
| public boolean isStoppable() { |
| return this.isStoppable; |
| } |
| |
| public boolean isOutputVertex() { |
| return this.results.isEmpty(); |
| } |
| |
| public boolean hasNoConnectedInputs() { |
| for (JobEdge edge : inputs) { |
| if (!edge.isIdReference()) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * A hook that can be overwritten by sub classes to implement logic that is called by the master |
| * when the job starts. |
| * |
| * @param loader The class loader for user defined code. |
| * @throws Exception The method may throw exceptions which cause the job to fail immediately. |
| */ |
| public void initializeOnMaster(ClassLoader loader) throws Exception {} |
| |
| /** |
| * A hook that can be overwritten by sub classes to implement logic that is called by the master |
| * after the job completed. |
| * |
| * @param loader The class loader for user defined code. |
| * @throws Exception The method may throw exceptions which cause the job to fail immediately. |
| */ |
| public void finalizeOnMaster(ClassLoader loader) throws Exception {} |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| public String getOperatorName() { |
| return operatorName; |
| } |
| |
| public void setOperatorName(String operatorName) { |
| this.operatorName = operatorName; |
| } |
| |
| public String getOperatorDescription() { |
| return operatorDescription; |
| } |
| |
| public void setOperatorDescription(String operatorDescription) { |
| this.operatorDescription = operatorDescription; |
| } |
| |
| public void setOperatorPrettyName(String operatorPrettyName) { |
| this.operatorPrettyName = operatorPrettyName; |
| } |
| |
| public String getOperatorPrettyName() { |
| return operatorPrettyName; |
| } |
| |
| public String getResultOptimizerProperties() { |
| return resultOptimizerProperties; |
| } |
| |
| public void setResultOptimizerProperties(String resultOptimizerProperties) { |
| this.resultOptimizerProperties = resultOptimizerProperties; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| @Override |
| public String toString() { |
| return this.name + " (" + this.invokableClassName + ')'; |
| } |
| } |