| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.api.graph; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.cache.DistributedCache; |
| import org.apache.flink.api.common.io.InputFormat; |
| import org.apache.flink.api.common.io.OutputFormat; |
| import org.apache.flink.api.common.operators.ResourceSpec; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.dag.Pipeline; |
| import org.apache.flink.api.java.functions.KeySelector; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.api.java.tuple.Tuple3; |
| import org.apache.flink.api.java.typeutils.MissingTypeInfo; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.PipelineOptions; |
| import org.apache.flink.core.execution.JobStatusHook; |
| import org.apache.flink.core.memory.ManagedMemoryUseCase; |
| import org.apache.flink.runtime.clusterframework.types.ResourceProfile; |
| import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; |
| import org.apache.flink.runtime.jobgraph.JobGraph; |
| import org.apache.flink.runtime.jobgraph.JobType; |
| import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; |
| import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; |
| import org.apache.flink.runtime.state.CheckpointStorage; |
| import org.apache.flink.runtime.state.StateBackend; |
| import org.apache.flink.streaming.api.TimeCharacteristic; |
| import org.apache.flink.streaming.api.environment.CheckpointConfig; |
| import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; |
| import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory; |
| import org.apache.flink.streaming.api.operators.SourceOperatorFactory; |
| import org.apache.flink.streaming.api.operators.StreamOperatorFactory; |
| import org.apache.flink.streaming.api.transformations.StreamExchangeMode; |
| import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner; |
| import org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner; |
| import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; |
| import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; |
| import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; |
| import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask; |
| import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; |
| import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; |
| import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; |
| import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; |
| import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; |
| import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; |
| import org.apache.flink.util.OutputTag; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nullable; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * Class representing the streaming topology. It contains all the information necessary to build the |
| * jobgraph for the execution. |
| */ |
| @Internal |
| public class StreamGraph implements Pipeline { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); |
| |
| public static final String ITERATION_SOURCE_NAME_PREFIX = "IterationSource"; |
| |
| public static final String ITERATION_SINK_NAME_PREFIX = "IterationSink"; |
| |
| private String jobName; |
| |
| private final Configuration jobConfiguration; |
| private final ExecutionConfig executionConfig; |
| private final CheckpointConfig checkpointConfig; |
| private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none(); |
| |
| private TimeCharacteristic timeCharacteristic; |
| |
| private GlobalStreamExchangeMode globalExchangeMode; |
| |
| private boolean enableCheckpointsAfterTasksFinish; |
| |
| /** Flag to indicate whether to put all vertices into the same slot sharing group by default. */ |
| private boolean allVerticesInSameSlotSharingGroupByDefault = true; |
| |
| private Map<Integer, StreamNode> streamNodes; |
| private Set<Integer> sources; |
| private Set<Integer> sinks; |
| private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes; |
| private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> |
| virtualPartitionNodes; |
| |
| protected Map<Integer, String> vertexIDtoBrokerID; |
| protected Map<Integer, Long> vertexIDtoLoopTimeout; |
| private StateBackend stateBackend; |
| private CheckpointStorage checkpointStorage; |
| private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; |
| private InternalTimeServiceManager.Provider timerServiceProvider; |
| private JobType jobType = JobType.STREAMING; |
| private Map<String, ResourceProfile> slotSharingGroupResources; |
| private PipelineOptions.VertexDescriptionMode descriptionMode = |
| PipelineOptions.VertexDescriptionMode.TREE; |
| private boolean vertexNameIncludeIndexPrefix = false; |
| |
| private final List<JobStatusHook> jobStatusHooks = new ArrayList<>(); |
| |
| private boolean dynamic; |
| |
| private boolean autoParallelismEnabled; |
| |
| public StreamGraph( |
| Configuration jobConfiguration, |
| ExecutionConfig executionConfig, |
| CheckpointConfig checkpointConfig, |
| SavepointRestoreSettings savepointRestoreSettings) { |
| this.jobConfiguration = new Configuration(checkNotNull(jobConfiguration)); |
| this.executionConfig = checkNotNull(executionConfig); |
| this.checkpointConfig = checkNotNull(checkpointConfig); |
| this.savepointRestoreSettings = checkNotNull(savepointRestoreSettings); |
| |
| // create an empty new stream graph. |
| clear(); |
| } |
| |
| /** Remove all registered nodes etc. */ |
| public void clear() { |
| streamNodes = new HashMap<>(); |
| virtualSideOutputNodes = new HashMap<>(); |
| virtualPartitionNodes = new HashMap<>(); |
| vertexIDtoBrokerID = new HashMap<>(); |
| vertexIDtoLoopTimeout = new HashMap<>(); |
| iterationSourceSinkPairs = new HashSet<>(); |
| sources = new HashSet<>(); |
| sinks = new HashSet<>(); |
| slotSharingGroupResources = new HashMap<>(); |
| } |
| |
| public ExecutionConfig getExecutionConfig() { |
| return executionConfig; |
| } |
| |
| public Configuration getJobConfiguration() { |
| return jobConfiguration; |
| } |
| |
| public CheckpointConfig getCheckpointConfig() { |
| return checkpointConfig; |
| } |
| |
| public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { |
| this.savepointRestoreSettings = savepointRestoreSettings; |
| } |
| |
| public SavepointRestoreSettings getSavepointRestoreSettings() { |
| return savepointRestoreSettings; |
| } |
| |
| public String getJobName() { |
| return jobName; |
| } |
| |
| public void setJobName(String jobName) { |
| this.jobName = jobName; |
| } |
| |
| public void setStateBackend(StateBackend backend) { |
| this.stateBackend = backend; |
| } |
| |
| public StateBackend getStateBackend() { |
| return this.stateBackend; |
| } |
| |
| public void setCheckpointStorage(CheckpointStorage checkpointStorage) { |
| this.checkpointStorage = checkpointStorage; |
| } |
| |
| public CheckpointStorage getCheckpointStorage() { |
| return this.checkpointStorage; |
| } |
| |
| public InternalTimeServiceManager.Provider getTimerServiceProvider() { |
| return timerServiceProvider; |
| } |
| |
| public void setTimerServiceProvider(InternalTimeServiceManager.Provider timerServiceProvider) { |
| this.timerServiceProvider = checkNotNull(timerServiceProvider); |
| } |
| |
| public Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> getUserArtifacts() { |
| return Optional.ofNullable(jobConfiguration.get(PipelineOptions.CACHED_FILES)) |
| .map(DistributedCache::parseCachedFilesFromString) |
| .orElse(new ArrayList<>()); |
| } |
| |
| public TimeCharacteristic getTimeCharacteristic() { |
| return timeCharacteristic; |
| } |
| |
| public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { |
| this.timeCharacteristic = timeCharacteristic; |
| } |
| |
| public GlobalStreamExchangeMode getGlobalStreamExchangeMode() { |
| return globalExchangeMode; |
| } |
| |
| public void setGlobalStreamExchangeMode(GlobalStreamExchangeMode globalExchangeMode) { |
| this.globalExchangeMode = globalExchangeMode; |
| } |
| |
| public void setSlotSharingGroupResource( |
| Map<String, ResourceProfile> slotSharingGroupResources) { |
| this.slotSharingGroupResources.putAll(slotSharingGroupResources); |
| } |
| |
| public Optional<ResourceProfile> getSlotSharingGroupResource(String groupId) { |
| return Optional.ofNullable(slotSharingGroupResources.get(groupId)); |
| } |
| |
| public boolean hasFineGrainedResource() { |
| return slotSharingGroupResources.values().stream() |
| .anyMatch(resourceProfile -> !resourceProfile.equals(ResourceProfile.UNKNOWN)); |
| } |
| |
| /** |
| * Set whether to put all vertices into the same slot sharing group by default. |
| * |
| * @param allVerticesInSameSlotSharingGroupByDefault indicates whether to put all vertices into |
| * the same slot sharing group by default. |
| */ |
| public void setAllVerticesInSameSlotSharingGroupByDefault( |
| boolean allVerticesInSameSlotSharingGroupByDefault) { |
| this.allVerticesInSameSlotSharingGroupByDefault = |
| allVerticesInSameSlotSharingGroupByDefault; |
| } |
| |
| /** |
| * Gets whether to put all vertices into the same slot sharing group by default. |
| * |
| * @return whether to put all vertices into the same slot sharing group by default. |
| */ |
| public boolean isAllVerticesInSameSlotSharingGroupByDefault() { |
| return allVerticesInSameSlotSharingGroupByDefault; |
| } |
| |
| public boolean isEnableCheckpointsAfterTasksFinish() { |
| return enableCheckpointsAfterTasksFinish; |
| } |
| |
| public void setEnableCheckpointsAfterTasksFinish(boolean enableCheckpointsAfterTasksFinish) { |
| this.enableCheckpointsAfterTasksFinish = enableCheckpointsAfterTasksFinish; |
| } |
| |
| // Checkpointing |
| |
| public boolean isChainingEnabled() { |
| return jobConfiguration.get(PipelineOptions.OPERATOR_CHAINING); |
| } |
| |
| public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled() { |
| return jobConfiguration.get( |
| PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM); |
| } |
| |
| public boolean isIterative() { |
| return !vertexIDtoLoopTimeout.isEmpty(); |
| } |
| |
| public <IN, OUT> void addSource( |
| Integer vertexID, |
| @Nullable String slotSharingGroup, |
| @Nullable String coLocationGroup, |
| SourceOperatorFactory<OUT> operatorFactory, |
| TypeInformation<IN> inTypeInfo, |
| TypeInformation<OUT> outTypeInfo, |
| String operatorName) { |
| addOperator( |
| vertexID, |
| slotSharingGroup, |
| coLocationGroup, |
| operatorFactory, |
| inTypeInfo, |
| outTypeInfo, |
| operatorName, |
| SourceOperatorStreamTask.class); |
| sources.add(vertexID); |
| } |
| |
| public <IN, OUT> void addLegacySource( |
| Integer vertexID, |
| @Nullable String slotSharingGroup, |
| @Nullable String coLocationGroup, |
| StreamOperatorFactory<OUT> operatorFactory, |
| TypeInformation<IN> inTypeInfo, |
| TypeInformation<OUT> outTypeInfo, |
| String operatorName) { |
| addOperator( |
| vertexID, |
| slotSharingGroup, |
| coLocationGroup, |
| operatorFactory, |
| inTypeInfo, |
| outTypeInfo, |
| operatorName); |
| sources.add(vertexID); |
| } |
| |
| public <IN, OUT> void addSink( |
| Integer vertexID, |
| @Nullable String slotSharingGroup, |
| @Nullable String coLocationGroup, |
| StreamOperatorFactory<OUT> operatorFactory, |
| TypeInformation<IN> inTypeInfo, |
| TypeInformation<OUT> outTypeInfo, |
| String operatorName) { |
| addOperator( |
| vertexID, |
| slotSharingGroup, |
| coLocationGroup, |
| operatorFactory, |
| inTypeInfo, |
| outTypeInfo, |
| operatorName); |
| if (operatorFactory instanceof OutputFormatOperatorFactory) { |
| setOutputFormat( |
| vertexID, ((OutputFormatOperatorFactory) operatorFactory).getOutputFormat()); |
| } |
| sinks.add(vertexID); |
| } |
| |
| public <IN, OUT> void addOperator( |
| Integer vertexID, |
| @Nullable String slotSharingGroup, |
| @Nullable String coLocationGroup, |
| StreamOperatorFactory<OUT> operatorFactory, |
| TypeInformation<IN> inTypeInfo, |
| TypeInformation<OUT> outTypeInfo, |
| String operatorName) { |
| Class<? extends TaskInvokable> invokableClass = |
| operatorFactory.isStreamSource() |
| ? SourceStreamTask.class |
| : OneInputStreamTask.class; |
| addOperator( |
| vertexID, |
| slotSharingGroup, |
| coLocationGroup, |
| operatorFactory, |
| inTypeInfo, |
| outTypeInfo, |
| operatorName, |
| invokableClass); |
| } |
| |
| private <IN, OUT> void addOperator( |
| Integer vertexID, |
| @Nullable String slotSharingGroup, |
| @Nullable String coLocationGroup, |
| StreamOperatorFactory<OUT> operatorFactory, |
| TypeInformation<IN> inTypeInfo, |
| TypeInformation<OUT> outTypeInfo, |
| String operatorName, |
| Class<? extends TaskInvokable> invokableClass) { |
| |
| addNode( |
| vertexID, |
| slotSharingGroup, |
| coLocationGroup, |
| invokableClass, |
| operatorFactory, |
| operatorName); |
| setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo)); |
| |
| if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) { |
| // sets the output type which must be know at StreamGraph creation time |
| operatorFactory.setOutputType(outTypeInfo, executionConfig); |
| } |
| |
| if (operatorFactory.isInputTypeConfigurable()) { |
| operatorFactory.setInputType(inTypeInfo, executionConfig); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Vertex: {}", vertexID); |
| } |
| } |
| |
| public <IN1, IN2, OUT> void addCoOperator( |
| Integer vertexID, |
| String slotSharingGroup, |
| @Nullable String coLocationGroup, |
| StreamOperatorFactory<OUT> taskOperatorFactory, |
| TypeInformation<IN1> in1TypeInfo, |
| TypeInformation<IN2> in2TypeInfo, |
| TypeInformation<OUT> outTypeInfo, |
| String operatorName) { |
| |
| Class<? extends TaskInvokable> vertexClass = TwoInputStreamTask.class; |
| |
| addNode( |
| vertexID, |
| slotSharingGroup, |
| coLocationGroup, |
| vertexClass, |
| taskOperatorFactory, |
| operatorName); |
| |
| TypeSerializer<OUT> outSerializer = createSerializer(outTypeInfo); |
| |
| setSerializers( |
| vertexID, |
| in1TypeInfo.createSerializer(executionConfig.getSerializerConfig()), |
| in2TypeInfo.createSerializer(executionConfig.getSerializerConfig()), |
| outSerializer); |
| |
| if (taskOperatorFactory.isOutputTypeConfigurable()) { |
| // sets the output type which must be known at StreamGraph creation time |
| taskOperatorFactory.setOutputType(outTypeInfo, executionConfig); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("CO-TASK: {}", vertexID); |
| } |
| } |
| |
| public <OUT> void addMultipleInputOperator( |
| Integer vertexID, |
| String slotSharingGroup, |
| @Nullable String coLocationGroup, |
| StreamOperatorFactory<OUT> operatorFactory, |
| List<TypeInformation<?>> inTypeInfos, |
| TypeInformation<OUT> outTypeInfo, |
| String operatorName) { |
| |
| Class<? extends TaskInvokable> vertexClass = MultipleInputStreamTask.class; |
| |
| addNode( |
| vertexID, |
| slotSharingGroup, |
| coLocationGroup, |
| vertexClass, |
| operatorFactory, |
| operatorName); |
| |
| setSerializers(vertexID, inTypeInfos, createSerializer(outTypeInfo)); |
| |
| if (operatorFactory.isOutputTypeConfigurable()) { |
| // sets the output type which must be known at StreamGraph creation time |
| operatorFactory.setOutputType(outTypeInfo, executionConfig); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("CO-TASK: {}", vertexID); |
| } |
| } |
| |
| protected StreamNode addNode( |
| Integer vertexID, |
| @Nullable String slotSharingGroup, |
| @Nullable String coLocationGroup, |
| Class<? extends TaskInvokable> vertexClass, |
| @Nullable StreamOperatorFactory<?> operatorFactory, |
| String operatorName) { |
| |
| if (streamNodes.containsKey(vertexID)) { |
| throw new RuntimeException("Duplicate vertexID " + vertexID); |
| } |
| |
| StreamNode vertex = |
| new StreamNode( |
| vertexID, |
| slotSharingGroup, |
| coLocationGroup, |
| operatorFactory, |
| operatorName, |
| vertexClass); |
| |
| streamNodes.put(vertexID, vertex); |
| |
| return vertex; |
| } |
| |
| /** |
| * Adds a new virtual node that is used to connect a downstream vertex to only the outputs with |
| * the selected side-output {@link OutputTag}. |
| * |
| * @param originalId ID of the node that should be connected to. |
| * @param virtualId ID of the virtual node. |
| * @param outputTag The selected side-output {@code OutputTag}. |
| */ |
| public void addVirtualSideOutputNode( |
| Integer originalId, Integer virtualId, OutputTag outputTag) { |
| |
| if (virtualSideOutputNodes.containsKey(virtualId)) { |
| throw new IllegalStateException("Already has virtual output node with id " + virtualId); |
| } |
| |
| // verify that we don't already have a virtual node for the given originalId/outputTag |
| // combination with a different TypeInformation. This would indicate that someone is trying |
| // to read a side output from an operation with a different type for the same side output |
| // id. |
| |
| for (Tuple2<Integer, OutputTag> tag : virtualSideOutputNodes.values()) { |
| if (!tag.f0.equals(originalId)) { |
| // different source operator |
| continue; |
| } |
| |
| if (tag.f1.getId().equals(outputTag.getId()) |
| && !tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { |
| throw new IllegalArgumentException( |
| "Trying to add a side output for the same " |
| + "side-output id with a different type. This is not allowed. Side-output ID: " |
| + tag.f1.getId()); |
| } |
| } |
| |
| virtualSideOutputNodes.put(virtualId, new Tuple2<>(originalId, outputTag)); |
| } |
| |
| /** |
| * Adds a new virtual node that is used to connect a downstream vertex to an input with a |
| * certain partitioning. |
| * |
| * <p>When adding an edge from the virtual node to a downstream node the connection will be made |
| * to the original node, but with the partitioning given here. |
| * |
| * @param originalId ID of the node that should be connected to. |
| * @param virtualId ID of the virtual node. |
| * @param partitioner The partitioner |
| */ |
| public void addVirtualPartitionNode( |
| Integer originalId, |
| Integer virtualId, |
| StreamPartitioner<?> partitioner, |
| StreamExchangeMode exchangeMode) { |
| |
| if (virtualPartitionNodes.containsKey(virtualId)) { |
| throw new IllegalStateException( |
| "Already has virtual partition node with id " + virtualId); |
| } |
| |
| virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, exchangeMode)); |
| } |
| |
| /** Determines the slot sharing group of an operation across virtual nodes. */ |
| public String getSlotSharingGroup(Integer id) { |
| if (virtualSideOutputNodes.containsKey(id)) { |
| Integer mappedId = virtualSideOutputNodes.get(id).f0; |
| return getSlotSharingGroup(mappedId); |
| } else if (virtualPartitionNodes.containsKey(id)) { |
| Integer mappedId = virtualPartitionNodes.get(id).f0; |
| return getSlotSharingGroup(mappedId); |
| } else { |
| StreamNode node = getStreamNode(id); |
| return node.getSlotSharingGroup(); |
| } |
| } |
| |
| public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) { |
| addEdge(upStreamVertexID, downStreamVertexID, typeNumber, null); |
| } |
| |
| public void addEdge( |
| Integer upStreamVertexID, |
| Integer downStreamVertexID, |
| int typeNumber, |
| IntermediateDataSetID intermediateDataSetId) { |
| addEdgeInternal( |
| upStreamVertexID, |
| downStreamVertexID, |
| typeNumber, |
| null, |
| new ArrayList<String>(), |
| null, |
| null, |
| intermediateDataSetId); |
| } |
| |
| private void addEdgeInternal( |
| Integer upStreamVertexID, |
| Integer downStreamVertexID, |
| int typeNumber, |
| StreamPartitioner<?> partitioner, |
| List<String> outputNames, |
| OutputTag outputTag, |
| StreamExchangeMode exchangeMode, |
| IntermediateDataSetID intermediateDataSetId) { |
| |
| if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { |
| int virtualId = upStreamVertexID; |
| upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; |
| if (outputTag == null) { |
| outputTag = virtualSideOutputNodes.get(virtualId).f1; |
| } |
| addEdgeInternal( |
| upStreamVertexID, |
| downStreamVertexID, |
| typeNumber, |
| partitioner, |
| null, |
| outputTag, |
| exchangeMode, |
| intermediateDataSetId); |
| } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { |
| int virtualId = upStreamVertexID; |
| upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; |
| if (partitioner == null) { |
| partitioner = virtualPartitionNodes.get(virtualId).f1; |
| } |
| exchangeMode = virtualPartitionNodes.get(virtualId).f2; |
| addEdgeInternal( |
| upStreamVertexID, |
| downStreamVertexID, |
| typeNumber, |
| partitioner, |
| outputNames, |
| outputTag, |
| exchangeMode, |
| intermediateDataSetId); |
| } else { |
| createActualEdge( |
| upStreamVertexID, |
| downStreamVertexID, |
| typeNumber, |
| partitioner, |
| outputTag, |
| exchangeMode, |
| intermediateDataSetId); |
| } |
| } |
| |
| private void createActualEdge( |
| Integer upStreamVertexID, |
| Integer downStreamVertexID, |
| int typeNumber, |
| StreamPartitioner<?> partitioner, |
| OutputTag outputTag, |
| StreamExchangeMode exchangeMode, |
| IntermediateDataSetID intermediateDataSetId) { |
| StreamNode upstreamNode = getStreamNode(upStreamVertexID); |
| StreamNode downstreamNode = getStreamNode(downStreamVertexID); |
| |
| // If no partitioner was specified and the parallelism of upstream and downstream |
| // operator matches use forward partitioning, use rebalance otherwise. |
| if (partitioner == null |
| && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { |
| partitioner = |
| dynamic ? new ForwardForUnspecifiedPartitioner<>() : new ForwardPartitioner<>(); |
| } else if (partitioner == null) { |
| partitioner = new RebalancePartitioner<Object>(); |
| } |
| |
| if (partitioner instanceof ForwardPartitioner) { |
| if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { |
| if (partitioner instanceof ForwardForConsecutiveHashPartitioner) { |
| partitioner = |
| ((ForwardForConsecutiveHashPartitioner<?>) partitioner) |
| .getHashPartitioner(); |
| } else { |
| throw new UnsupportedOperationException( |
| "Forward partitioning does not allow " |
| + "change of parallelism. Upstream operation: " |
| + upstreamNode |
| + " parallelism: " |
| + upstreamNode.getParallelism() |
| + ", downstream operation: " |
| + downstreamNode |
| + " parallelism: " |
| + downstreamNode.getParallelism() |
| + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); |
| } |
| } |
| } |
| |
| if (exchangeMode == null) { |
| exchangeMode = StreamExchangeMode.UNDEFINED; |
| } |
| |
| /** |
| * Just make sure that {@link StreamEdge} connecting same nodes (for example as a result of |
| * self unioning a {@link DataStream}) are distinct and unique. Otherwise it would be |
| * difficult on the {@link StreamTask} to assign {@link RecordWriter}s to correct {@link |
| * StreamEdge}. |
| */ |
| int uniqueId = getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size(); |
| |
| StreamEdge edge = |
| new StreamEdge( |
| upstreamNode, |
| downstreamNode, |
| typeNumber, |
| partitioner, |
| outputTag, |
| exchangeMode, |
| uniqueId, |
| intermediateDataSetId); |
| |
| getStreamNode(edge.getSourceId()).addOutEdge(edge); |
| getStreamNode(edge.getTargetId()).addInEdge(edge); |
| } |
| |
| public void setParallelism(Integer vertexID, int parallelism) { |
| if (getStreamNode(vertexID) != null) { |
| getStreamNode(vertexID).setParallelism(parallelism); |
| } |
| } |
| |
| public boolean isDynamic() { |
| return dynamic; |
| } |
| |
| public void setParallelism(Integer vertexId, int parallelism, boolean parallelismConfigured) { |
| if (getStreamNode(vertexId) != null) { |
| getStreamNode(vertexId).setParallelism(parallelism, parallelismConfigured); |
| } |
| } |
| |
| public void setDynamic(boolean dynamic) { |
| this.dynamic = dynamic; |
| } |
| |
| public void setMaxParallelism(int vertexID, int maxParallelism) { |
| if (getStreamNode(vertexID) != null) { |
| getStreamNode(vertexID).setMaxParallelism(maxParallelism); |
| } |
| } |
| |
| public void setResources( |
| int vertexID, ResourceSpec minResources, ResourceSpec preferredResources) { |
| if (getStreamNode(vertexID) != null) { |
| getStreamNode(vertexID).setResources(minResources, preferredResources); |
| } |
| } |
| |
| public void setManagedMemoryUseCaseWeights( |
| int vertexID, |
| Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights, |
| Set<ManagedMemoryUseCase> slotScopeUseCases) { |
| if (getStreamNode(vertexID) != null) { |
| getStreamNode(vertexID) |
| .setManagedMemoryUseCaseWeights(operatorScopeUseCaseWeights, slotScopeUseCases); |
| } |
| } |
| |
| public void setOneInputStateKey( |
| Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) { |
| StreamNode node = getStreamNode(vertexID); |
| node.setStatePartitioners(keySelector); |
| node.setStateKeySerializer(keySerializer); |
| } |
| |
| public void setTwoInputStateKey( |
| Integer vertexID, |
| KeySelector<?, ?> keySelector1, |
| KeySelector<?, ?> keySelector2, |
| TypeSerializer<?> keySerializer) { |
| StreamNode node = getStreamNode(vertexID); |
| node.setStatePartitioners(keySelector1, keySelector2); |
| node.setStateKeySerializer(keySerializer); |
| } |
| |
| public void setMultipleInputStateKey( |
| Integer vertexID, |
| List<KeySelector<?, ?>> keySelectors, |
| TypeSerializer<?> keySerializer) { |
| StreamNode node = getStreamNode(vertexID); |
| node.setStatePartitioners(keySelectors.stream().toArray(KeySelector[]::new)); |
| node.setStateKeySerializer(keySerializer); |
| } |
| |
| public void setBufferTimeout(Integer vertexID, long bufferTimeout) { |
| if (getStreamNode(vertexID) != null) { |
| getStreamNode(vertexID).setBufferTimeout(bufferTimeout); |
| } |
| } |
| |
| public void setSerializers( |
| Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) { |
| StreamNode vertex = getStreamNode(vertexID); |
| vertex.setSerializersIn(in1, in2); |
| vertex.setSerializerOut(out); |
| } |
| |
| private <OUT> void setSerializers( |
| Integer vertexID, List<TypeInformation<?>> inTypeInfos, TypeSerializer<OUT> out) { |
| |
| StreamNode vertex = getStreamNode(vertexID); |
| |
| vertex.setSerializersIn( |
| inTypeInfos.stream() |
| .map( |
| typeInfo -> |
| typeInfo.createSerializer( |
| executionConfig.getSerializerConfig())) |
| .toArray(TypeSerializer[]::new)); |
| vertex.setSerializerOut(out); |
| } |
| |
| public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) { |
| getStreamNode(vertexID).setInputFormat(inputFormat); |
| } |
| |
| public void setOutputFormat(Integer vertexID, OutputFormat<?> outputFormat) { |
| getStreamNode(vertexID).setOutputFormat(outputFormat); |
| } |
| |
| public void setTransformationUID(Integer nodeId, String transformationId) { |
| StreamNode node = streamNodes.get(nodeId); |
| if (node != null) { |
| node.setTransformationUID(transformationId); |
| } |
| } |
| |
| void setTransformationUserHash(Integer nodeId, String nodeHash) { |
| StreamNode node = streamNodes.get(nodeId); |
| if (node != null) { |
| node.setUserHash(nodeHash); |
| } |
| } |
| |
| public StreamNode getStreamNode(Integer vertexID) { |
| return streamNodes.get(vertexID); |
| } |
| |
| protected Collection<? extends Integer> getVertexIDs() { |
| return streamNodes.keySet(); |
| } |
| |
| @VisibleForTesting |
| public List<StreamEdge> getStreamEdges(int sourceId) { |
| return getStreamNode(sourceId).getOutEdges(); |
| } |
| |
| @VisibleForTesting |
| public List<StreamEdge> getStreamEdges(int sourceId, int targetId) { |
| List<StreamEdge> result = new ArrayList<>(); |
| for (StreamEdge edge : getStreamNode(sourceId).getOutEdges()) { |
| if (edge.getTargetId() == targetId) { |
| result.add(edge); |
| } |
| } |
| return result; |
| } |
| |
| @VisibleForTesting |
| @Deprecated |
| public List<StreamEdge> getStreamEdgesOrThrow(int sourceId, int targetId) { |
| List<StreamEdge> result = getStreamEdges(sourceId, targetId); |
| if (result.isEmpty()) { |
| throw new RuntimeException( |
| "No such edge in stream graph: " + sourceId + " -> " + targetId); |
| } |
| return result; |
| } |
| |
| public Collection<Integer> getSourceIDs() { |
| return sources; |
| } |
| |
| public Collection<Integer> getSinkIDs() { |
| return sinks; |
| } |
| |
| public Collection<StreamNode> getStreamNodes() { |
| return streamNodes.values(); |
| } |
| |
| public String getBrokerID(Integer vertexID) { |
| return vertexIDtoBrokerID.get(vertexID); |
| } |
| |
| public long getLoopTimeout(Integer vertexID) { |
| return vertexIDtoLoopTimeout.get(vertexID); |
| } |
| |
| public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink( |
| int loopId, |
| int sourceId, |
| int sinkId, |
| long timeout, |
| int parallelism, |
| int maxParallelism, |
| ResourceSpec minResources, |
| ResourceSpec preferredResources) { |
| |
| final String coLocationGroup = "IterationCoLocationGroup-" + loopId; |
| |
| StreamNode source = |
| this.addNode( |
| sourceId, |
| null, |
| coLocationGroup, |
| StreamIterationHead.class, |
| null, |
| ITERATION_SOURCE_NAME_PREFIX + "-" + loopId); |
| sources.add(source.getId()); |
| setParallelism(source.getId(), parallelism); |
| setMaxParallelism(source.getId(), maxParallelism); |
| setResources(source.getId(), minResources, preferredResources); |
| |
| StreamNode sink = |
| this.addNode( |
| sinkId, |
| null, |
| coLocationGroup, |
| StreamIterationTail.class, |
| null, |
| ITERATION_SINK_NAME_PREFIX + "-" + loopId); |
| sinks.add(sink.getId()); |
| setParallelism(sink.getId(), parallelism); |
| setMaxParallelism(sink.getId(), parallelism); |
| // The tail node is always in the same slot sharing group with the head node |
| // so that they can share resources (they do not use non-sharable resources, |
| // i.e. managed memory). There is no contract on how the resources should be |
| // divided for head and tail nodes at the moment. To be simple, we assign all |
| // resources to the head node and set the tail node resources to be zero if |
| // resources are specified. |
| final ResourceSpec tailResources = |
| minResources.equals(ResourceSpec.UNKNOWN) |
| ? ResourceSpec.UNKNOWN |
| : ResourceSpec.ZERO; |
| setResources(sink.getId(), tailResources, tailResources); |
| |
| iterationSourceSinkPairs.add(new Tuple2<>(source, sink)); |
| |
| this.vertexIDtoBrokerID.put(source.getId(), "broker-" + loopId); |
| this.vertexIDtoBrokerID.put(sink.getId(), "broker-" + loopId); |
| this.vertexIDtoLoopTimeout.put(source.getId(), timeout); |
| this.vertexIDtoLoopTimeout.put(sink.getId(), timeout); |
| |
| return new Tuple2<>(source, sink); |
| } |
| |
| public Set<Tuple2<StreamNode, StreamNode>> getIterationSourceSinkPairs() { |
| return iterationSourceSinkPairs; |
| } |
| |
| public StreamNode getSourceVertex(StreamEdge edge) { |
| return streamNodes.get(edge.getSourceId()); |
| } |
| |
| public StreamNode getTargetVertex(StreamEdge edge) { |
| return streamNodes.get(edge.getTargetId()); |
| } |
| |
| private void removeEdge(StreamEdge edge) { |
| getSourceVertex(edge).getOutEdges().remove(edge); |
| getTargetVertex(edge).getInEdges().remove(edge); |
| } |
| |
| private void removeVertex(StreamNode toRemove) { |
| Set<StreamEdge> edgesToRemove = new HashSet<>(); |
| |
| edgesToRemove.addAll(toRemove.getInEdges()); |
| edgesToRemove.addAll(toRemove.getOutEdges()); |
| |
| for (StreamEdge edge : edgesToRemove) { |
| removeEdge(edge); |
| } |
| streamNodes.remove(toRemove.getId()); |
| } |
| |
| /** Gets the assembled {@link JobGraph} with a random {@link JobID}. */ |
| @VisibleForTesting |
| public JobGraph getJobGraph() { |
| return getJobGraph(Thread.currentThread().getContextClassLoader(), null); |
| } |
| |
| /** Gets the assembled {@link JobGraph} with a specified {@link JobID}. */ |
| public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) { |
| return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID); |
| } |
| |
| public String getStreamingPlanAsJSON() { |
| try { |
| return new JSONGenerator(this).getJSON(); |
| } catch (Exception e) { |
| throw new RuntimeException("JSON plan creation failed", e); |
| } |
| } |
| |
| private <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInfo) { |
| return typeInfo != null && !(typeInfo instanceof MissingTypeInfo) |
| ? typeInfo.createSerializer(executionConfig.getSerializerConfig()) |
| : null; |
| } |
| |
| public void setJobType(JobType jobType) { |
| this.jobType = jobType; |
| } |
| |
| public JobType getJobType() { |
| return jobType; |
| } |
| |
| public boolean isAutoParallelismEnabled() { |
| return autoParallelismEnabled; |
| } |
| |
| public void setAutoParallelismEnabled(boolean autoParallelismEnabled) { |
| this.autoParallelismEnabled = autoParallelismEnabled; |
| } |
| |
| public PipelineOptions.VertexDescriptionMode getVertexDescriptionMode() { |
| return descriptionMode; |
| } |
| |
| public void setVertexDescriptionMode(PipelineOptions.VertexDescriptionMode mode) { |
| this.descriptionMode = mode; |
| } |
| |
| public void setVertexNameIncludeIndexPrefix(boolean includePrefix) { |
| this.vertexNameIncludeIndexPrefix = includePrefix; |
| } |
| |
| public boolean isVertexNameIncludeIndexPrefix() { |
| return this.vertexNameIncludeIndexPrefix; |
| } |
| |
| /** Registers the JobStatusHook. */ |
| public void registerJobStatusHook(JobStatusHook hook) { |
| checkNotNull(hook, "Registering a null JobStatusHook is not allowed. "); |
| if (!jobStatusHooks.contains(hook)) { |
| this.jobStatusHooks.add(hook); |
| } |
| } |
| |
| public List<JobStatusHook> getJobStatusHooks() { |
| return this.jobStatusHooks; |
| } |
| |
| public void setSupportsConcurrentExecutionAttempts( |
| Integer vertexId, boolean supportsConcurrentExecutionAttempts) { |
| final StreamNode streamNode = getStreamNode(vertexId); |
| if (streamNode != null) { |
| streamNode.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts); |
| } |
| } |
| } |