blob: 27a446e32b1e33b3729ced43dd57af702ba28e08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.util
import org.apache.flink.table.plan.nodes.calcite.Sink
import org.apache.flink.table.plan.nodes.exec.{ExecNode, ExecNodeVisitor}
import com.google.common.collect.{Maps, Sets}
import org.apache.calcite.sql.SqlExplainLevel
import java.io.{PrintWriter, StringWriter}
import java.util
import java.util.concurrent.atomic.AtomicInteger
object FlinkNodeOptUtil {
/**
* Converts an [[ExecNode]] tree to a string as a tree style.
*
* @param node the ExecNode to convert
* @param detailLevel detailLevel defines detail levels for EXPLAIN PLAN.
* @param withResource whether including resource information of ExecNode (only apply to
* BatchExecNode node at present)
* @param withMemCost whether including memory cost information of ExecNode (only apply to
* BatchExecNode node at present)
* @param withRelNodeId whether including ID of the RelNode corresponding to an ExecNode
* @param withRetractTraits whether including Retraction Traits of RelNode corresponding to
* an ExecNode (only apply to StreamPhysicalRel node at present)
* @return explain plan of ExecNode
*/
def treeToString(
node: ExecNode[_, _],
detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
withResource: Boolean = false,
withMemCost: Boolean = false, // TODO remove this arg ???
withRelNodeId: Boolean = false,
withRetractTraits: Boolean = false): String = {
doConvertTreeToString(
node,
detailLevel = detailLevel,
withResource = withResource,
withMemCost = withMemCost,
withRelNodeId = withRelNodeId,
withRetractTraits = withRetractTraits)
}
/**
* Converts an [[ExecNode]] DAG to a string as a tree style.
*
* @param nodes the ExecNodes to convert
* @param detailLevel detailLevel defines detail levels for EXPLAIN PLAN.
* @param withResource whether including resource information of ExecNode (only apply to
* BatchExecNode node at present)
* @param withMemCost whether including memory cost information of ExecNode (only apply to
* BatchExecNode node at present)
* @param withRelNodeId whether including ID of the RelNode corresponding to an ExecNode
* @param withRetractTraits whether including Retraction Traits of RelNode corresponding to
* an ExecNode (only apply to StreamPhyscialRel node at present)
* @return explain plan of ExecNode
*/
def dagToString(
nodes: Seq[ExecNode[_, _]],
detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
withResource: Boolean = false,
withMemCost: Boolean = false, // TODO remove this arg ???
withRelNodeId: Boolean = false,
withRetractTraits: Boolean = false): String = {
if (nodes.length == 1) {
return treeToString(
nodes.head,
detailLevel,
withResource = withResource,
withMemCost = withMemCost,
withRelNodeId = withRelNodeId,
withRetractTraits = withRetractTraits)
}
val reuseInfoBuilder = new ReuseInfoBuilder()
nodes.foreach(reuseInfoBuilder.visit)
// node sets that stop explain when meet them
val stopExplainNodes = Sets.newIdentityHashSet[ExecNode[_, _]]()
// mapping node to reuse info, the map value is a tuple2,
// the first value of the tuple is reuse id,
// the second value is true if the node is first visited else false.
val reuseInfoMap = Maps.newIdentityHashMap[ExecNode[_, _], (Integer, Boolean)]()
// mapping node object to visited times
val mapNodeToVisitedTimes = Maps.newIdentityHashMap[ExecNode[_, _], Int]()
val sb = new StringBuilder()
val visitor = new ExecNodeVisitor {
override def visit(node: ExecNode[_, _]): Unit = {
val visitedTimes = mapNodeToVisitedTimes.getOrDefault(node, 0) + 1
mapNodeToVisitedTimes.put(node, visitedTimes)
if (visitedTimes == 1) {
super.visit(node)
}
val reuseId = reuseInfoBuilder.getReuseId(node)
val isReuseNode = reuseId.isDefined
if (node.isInstanceOf[Sink] || (isReuseNode && !reuseInfoMap.containsKey(node))) {
if (isReuseNode) {
reuseInfoMap.put(node, (reuseId.get, true))
}
val reusePlan = doConvertTreeToString(
node,
detailLevel = detailLevel,
withResource = withResource,
withMemCost = withMemCost,
withRelNodeId = withRelNodeId,
withRetractTraits = withRetractTraits,
stopExplainNodes = Some(stopExplainNodes),
reuseInfoMap = Some(reuseInfoMap))
sb.append(reusePlan).append(System.lineSeparator)
if (isReuseNode) {
// update visit info after the reuse node visited
stopExplainNodes.add(node)
reuseInfoMap.put(node, (reuseId.get, false))
}
}
}
}
nodes.foreach(visitor.visit)
if (sb.length() > 0) {
// delete last line separator
sb.deleteCharAt(sb.length - 1)
}
sb.toString()
}
private def doConvertTreeToString(
node: ExecNode[_, _],
detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
withResource: Boolean = false,
withMemCost: Boolean = false,
withRelNodeId: Boolean = false,
withRetractTraits: Boolean = false,
stopExplainNodes: Option[util.Set[ExecNode[_, _]]] = None,
reuseInfoMap: Option[util.IdentityHashMap[ExecNode[_, _], (Integer, Boolean)]] = None
): String = {
// get ExecNode explain value by RelNode#explain now
val sw = new StringWriter
val planWriter = new NodeTreeWriterImpl(
node,
new PrintWriter(sw),
explainLevel = detailLevel,
withResource = withResource,
withMemCost = withMemCost,
withRelNodeId = withRelNodeId,
withRetractTraits = withRetractTraits,
stopExplainNodes = stopExplainNodes,
reuseInfoMap = reuseInfoMap)
node.getFlinkPhysicalRel.explain(planWriter)
sw.toString
}
/**
* build reuse id in an ExecNode DAG.
*/
class ReuseInfoBuilder extends ExecNodeVisitor {
// visited node set
private val visitedNodes = Sets.newIdentityHashSet[ExecNode[_, _]]()
// mapping reuse node to its reuse id
private val mapReuseNodeToReuseId = Maps.newIdentityHashMap[ExecNode[_, _], Integer]()
private val reuseIdGenerator = new AtomicInteger(0)
override def visit(node: ExecNode[_, _]): Unit = {
// if a node is visited more than once, this node is a reusable node
if (visitedNodes.contains(node)) {
if (!mapReuseNodeToReuseId.containsKey(node)) {
val reuseId = reuseIdGenerator.incrementAndGet()
mapReuseNodeToReuseId.put(node, reuseId)
}
} else {
visitedNodes.add(node)
super.visit(node)
}
}
/**
* Returns reuse id if the given node is a reuse node (that means it has multiple outputs),
* else None.
*/
def getReuseId(node: ExecNode[_, _]): Option[Integer] = {
if (mapReuseNodeToReuseId.containsKey(node)) {
Some(mapReuseNodeToReuseId.get(node))
} else {
None
}
}
}
}