blob: 67e5fe24ecd97c9cfffc4bc2d1a3e33d07334109 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.util.resource;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.util.Preconditions;
import java.util.HashMap;
import java.util.Map;
/**
* StreamGraphPropertyGenerator base on rules.
*/
public class StreamGraphPropertyGenerator {
/**
* Generate json format of streaming graph.
*
* @param streamGraph streamGraph
* @return ResourceFile
*/
public static StreamGraphProperty generateProperties(StreamGraph streamGraph) {
Preconditions.checkNotNull(streamGraph, "streamGraph cannot be null");
StreamGraphProperty properties = new StreamGraphProperty();
Map<String, StreamNodeProperty> uidToNodeProperties = new HashMap<>();
Map<Integer, String> nodeIdMap = new HashMap<>();
Map<String, StreamNode> existedUidMap = StreamNodeUtil.setUid(streamGraph);
// nodes
for (StreamNode streamNode : streamGraph.getStreamNodes()) {
String uid = streamNode.getTransformationUID();
nodeIdMap.put(streamNode.getId(), uid);
StreamNodeProperty streamNodeProperty = new StreamNodeProperty(uid);
uidToNodeProperties.put(uid, streamNodeProperty);
streamNodeProperty.setName(getNodeName(streamNode.getOperatorName()));
if (streamNode.getInEdges().isEmpty()) {
streamNodeProperty.setPact("Source");
} else if (streamNode.getOutEdges().isEmpty()) {
streamNodeProperty.setPact("Sink");
} else {
streamNodeProperty.setPact("Operator");
}
streamNodeProperty.setName(getNodeName(streamNode.getOperatorName()));
// set max parallelism according to origin max parallelism
int nodeMaxParallel = StreamNodeUtil.getMaxParallelism(streamNode);
int maxParallelism = nodeMaxParallel > 0 ? nodeMaxParallel : StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM;
streamNodeProperty.setMaxParallelism(maxParallelism);
if (streamNode.getParallelism() > 0) {
// parallelism configured already, not changed.
streamNodeProperty.setParallelism(streamNode.getParallelism());
}
ResourceSpec minResourceSpec = streamNode.getMinResources();
Preconditions.checkArgument(minResourceSpec != null, "resource can not be null.");
streamNodeProperty.setCpuCores(minResourceSpec.getCpuCores());
streamNodeProperty.setHeapMemoryInMB(minResourceSpec.getHeapMemory());
streamNodeProperty.setDirectMemoryInMB(minResourceSpec.getDirectMemory());
streamNodeProperty.setNativeMemoryInMB(minResourceSpec.getNativeMemory());
streamNodeProperty.setGpuLoad(minResourceSpec.getGPUResource());
if (minResourceSpec.getExtendedResources().containsKey(ResourceSpec.MANAGED_MEMORY_NAME)) {
streamNodeProperty.setManagedMemoryInMB(
(int) ((minResourceSpec.getExtendedResources().get(ResourceSpec.MANAGED_MEMORY_NAME)).getValue()));
}
if (minResourceSpec.getExtendedResources().containsKey(ResourceSpec.FLOATING_MANAGED_MEMORY_NAME)) {
streamNodeProperty.setFloatingManagedMemoryInMB(
(int) ((minResourceSpec.getExtendedResources().get(ResourceSpec.FLOATING_MANAGED_MEMORY_NAME)).getValue()));
}
properties.getStreamNodeProperties().add(streamNodeProperty);
}
StreamNodeUtil.clearAppendUids(streamGraph, existedUidMap);
// links
for (StreamNode source : streamGraph.getStreamNodes()) {
Map<Integer, Integer> edgeCounts = new HashMap<>();
for (StreamEdge edge : source.getOutEdges()) {
edgeCounts.putIfAbsent(edge.getTargetId(), 0);
int index = edgeCounts.put(edge.getTargetId(), 1 + edgeCounts.get(edge.getTargetId()));
StreamEdgeProperty streamEdgeProperty = new StreamEdgeProperty(
nodeIdMap.get(edge.getSourceId()),
nodeIdMap.get(edge.getTargetId()),
index);
streamEdgeProperty.setShipStrategy(edge.getPartitioner().toString());
// change rescale and rebalance to forward
StreamNodeProperty sourceNode = uidToNodeProperties.get(streamEdgeProperty.getSource());
StreamNodeProperty targetNode = uidToNodeProperties.get(streamEdgeProperty.getTarget());
if (sourceNode.getParallelism() == targetNode.getParallelism() &&
sourceNode.getMaxParallelism() == targetNode.getMaxParallelism() &&
StreamEdgeProperty.STRATEGY_UPDATABLE.contains(streamEdgeProperty.getShipStrategy())) {
streamEdgeProperty.setShipStrategy(StreamEdgeProperty.FORWARD_STRATEGY);
}
properties.getStreamEdgeProperties().add(streamEdgeProperty);
}
}
return properties;
}
private static String getNodeName(String nodeName) {
return StringUtil.filterSpecChars(nodeName.trim())
.replaceFirst("Source: ", "") // "Source: " will be auto added
.replaceFirst("Sink: ", ""); // "Sink: " will be auto added
}
}