blob: 1aa118968374878fa72769ede849fc7586d22d29 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api
import org.apache.flink.annotation.{InterfaceStability, VisibleForTesting}
import org.apache.flink.api.common.accumulators.SerializedListAccumulator
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.common.{ExecutionMode, JobExecutionResult}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.schedule.VertexInputTracker.{InputDependencyConstraint, VertexInputTrackerOptions}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.{StreamGraph, StreamGraphGenerator}
import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.types.{DataType, DataTypes, RowType}
import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.errorcode.TableErrors
import org.apache.flink.table.expressions.{Expression, TimeAttribute}
import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.plan.cost.{FlinkBatchCost, FlinkCostFactory}
import org.apache.flink.table.plan.logical.{LogicalNode, SinkNode}
import org.apache.flink.table.plan.nodes.calcite.LogicalSink
import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.nodes.physical.batch.{BatchExecSink, BatchPhysicalRel}
import org.apache.flink.table.plan.nodes.process.DAGProcessContext
import org.apache.flink.table.plan.optimize.{BatchOptimizeContext, FlinkBatchPrograms}
import org.apache.flink.table.plan.schema._
import org.apache.flink.table.plan.stats.FlinkStatistic
import org.apache.flink.table.plan.subplan.BatchDAGOptimizer
import org.apache.flink.table.plan.util.{DeadlockBreakupProcessor, FlinkNodeOptUtil, FlinkRelOptUtil, SameRelObjectShuttle, SubplanReuseUtil}
import org.apache.flink.table.resource.batch.RunningUnitKeeper
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics
import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{BatchTableSource, _}
import org.apache.flink.table.temptable.TableServiceException
import org.apache.flink.table.util.PlanUtil._
import org.apache.flink.table.util._
import org.apache.flink.util.{AbstractID, ExceptionUtils, Preconditions}
import org.apache.calcite.plan.{Context, ConventionTraitDef, RelOptPlanner}
import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
import org.apache.calcite.sql.SqlExplainLevel
import org.apache.calcite.sql2rel.SqlToRelConverter
import org.apache.calcite.sql2rel.SqlToRelConverter.Config
import _root_.java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
import _root_.scala.collection.JavaConversions._
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable
import _root_.scala.collection.mutable.ArrayBuffer
import _root_.scala.util.{Failure, Success, Try}
/**
* A session to construct between [[Table]] and [[DataStream]], its main function is:
*
* 1. Get a table from [[DataStream]], or through registering a [[TableSource]];
* 2. Transform current already construct table to [[DataStream]];
* 3. Add [[TableSink]] to the [[Table]].
* @param config The [[TableConfig]] of this [[BatchTableEnvironment]].
*/
@InterfaceStability.Evolving
class BatchTableEnvironment(
val streamEnv: StreamExecutionEnvironment,
config: TableConfig)
extends TableEnvironment(streamEnv, config) {
private val ruKeeper = new RunningUnitKeeper(this)
/** Fetch [[RunningUnitKeeper]] bond with this table env. */
private[table] def getRUKeeper: RunningUnitKeeper = ruKeeper
// the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
override protected def createRelBuilder: FlinkRelBuilder = FlinkRelBuilder.create(
frameworkConfig,
config,
getTypeFactory,
Array(
ConventionTraitDef.INSTANCE,
FlinkRelDistributionTraitDef.INSTANCE,
RelCollationTraitDef.INSTANCE),
catalogManager
)
// prefix for unique table names.
override private[flink] val tableNamePrefix = "_DataStreamTable_"
/**
* `expand` is set as false, and each sub-query becomes a [[org.apache.calcite.rex.RexSubQuery]].
*/
override protected def getSqlToRelConverterConfig: Config =
SqlToRelConverter.configBuilder()
.withTrimUnusedFields(false)
.withConvertTableAccess(false)
.withExpand(false)
.withInSubQueryThreshold(Int.MaxValue)
.build()
/**
* Returns specific FlinkCostFactory of this Environment.
*/
override protected def getFlinkCostFactory: FlinkCostFactory = FlinkBatchCost.FACTORY
/**
* Triggers the program execution with specific job name.
* @param jobName name for the job
*/
override def execute(jobName: String): JobExecutionResult = {
tableServiceManager.startTableServiceJob()
val sinkNodesBak = new mutable.MutableList[SinkNode]
sinkNodesBak ++= sinkNodes
Try {
val streamGraph = generateStreamGraph(jobName)
val res = executeStreamGraph(streamGraph)
tableServiceManager.markAllTablesCached()
res
} match {
case Success(value) => value
case Failure(ex) => ex match {
case je: JobExecutionException
if ExceptionUtils.findThrowable(je, classOf[TableServiceException]).isPresent =>
sinkNodes ++= sinkNodesBak
tableServiceManager.invalidateCachedTable()
val streamGraph = generateStreamGraph(jobName)
executeStreamGraph(streamGraph)
case _ => throw ex
}
}
}
private[flink] override def collect[T](
table: Table,
sink: CollectTableSink[T],
jobName: Option[String]): Seq[T] = {
val outType = sink.getOutputType
val typeSerializer = DataTypes.createExternalSerializer(outType)
.asInstanceOf[TypeSerializer[T]]
val id = new AbstractID().toString
sink.init(typeSerializer, id)
writeToSink(table, sink)
val res = execute(jobName.getOrElse(DEFAULT_JOB_NAME))
val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
SerializedListAccumulator.deserializeList(accResult, typeSerializer).asScala
}
protected override def translateStreamGraph(
streamingTransformations: ArrayBuffer[StreamTransformation[_]],
jobName: Option[String]): StreamGraph = {
mergeParameters()
val context = StreamGraphGenerator.Context.buildBatchProperties(streamEnv)
if (getConfig.getConf.getBoolean(TableConfigOptions.SQL_EXEC_DATA_EXCHANGE_MODE_ALL_BATCH)) {
val constraint = getConfig.getConf.getValue(
VertexInputTrackerOptions.INPUT_DEPENDENCY_CONSTRAINT)
// When the user does not set this value,
// in batch mode, use ALL to avoid deadlock and resource utilization rate.
if (constraint == null) {
getConfig.getConf.setString(
VertexInputTrackerOptions.INPUT_DEPENDENCY_CONSTRAINT,
InputDependencyConstraint.ALL.toString)
}
context.getExecutionConfig.setExecutionMode(ExecutionMode.BATCH)
} else {
context.getExecutionConfig.setExecutionMode(ExecutionMode.PIPELINED)
}
context.setSlotSharingEnabled(false)
ruKeeper.setScheduleConfig(context)
jobName match {
case Some(jn) => context.setJobName(jn)
case None => context.setJobName(DEFAULT_JOB_NAME)
}
val streamGraph = StreamGraphGenerator.generate(context, streamingTransformations)
setupOperatorMetricCollect()
ruKeeper.clear()
streamingTransformations.clear()
streamGraph
}
private def executeStreamGraph(streamGraph: StreamGraph): JobExecutionResult = {
val result = streamEnv.execute(streamGraph)
dumpPlanWithMetricsIfNeed(streamGraph, result)
result
}
/**
* Set up operator metric collect to be true.
*/
@VisibleForTesting
private[flink] def setupOperatorMetricCollect(): Unit = {
if (streamEnv != null &&
streamEnv.getConfig != null &&
config.getConf.getBoolean(TableConfigOptions.SQL_EXEC_OPERATOR_METRIC_DUMP_ENABLED)) {
val parameters = new Configuration()
Option(streamEnv.getConfig.getGlobalJobParameters).foreach(gb =>
gb.toMap.foreach(kv => parameters.setString(kv._1, kv._2))
)
parameters.setString(
AbstractStreamOperatorWithMetrics.METRICS_CONF_KEY,
AbstractStreamOperatorWithMetrics.METRICS_CONF_VALUE)
streamEnv.getConfig.setGlobalJobParameters(parameters)
}
}
override private[table] def writeToSink[T](
table: Table,
sink: TableSink[T],
sinkName: String): Unit = {
val sinkNode = SinkNode(table.logicalPlan, sink, sinkName)
if (config.getSubsectionOptimization) {
sinkNodes += sinkNode
} else {
val transformation = translate(table, sink, sinkName)
transformations.add(transformation)
}
}
/**
* Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
* catalog.
*
* @param name The name under which the table is registered in the catalog.
* @param boundedStream The [[DataStream]] to register as table in the catalog.
* @tparam T the type of the [[DataStream]].
*/
protected def registerBoundedStreamInternal[T](
name: String, boundedStream: DataStream[T], replace: Boolean): Unit = {
val (fieldNames, fieldIdxs) = getFieldInfo(boundedStream.getTransformation.getOutputType)
val boundedStreamTable = new DataStreamTable[T](boundedStream, fieldIdxs, fieldNames)
registerTableInternal(name, boundedStreamTable, replace)
}
/**
* Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
* catalog.
*
* @param name The name under which the table is registered in the catalog.
* @param boundedStream The [[DataStream]] to register as table in the catalog.
* @param fieldNullables The field isNullables attributes of boundedStream.
* @tparam T the type of the [[DataStream]].
*/
protected def registerBoundedStreamInternal[T](
name: String,
boundedStream: DataStream[T],
fieldNullables: Array[Boolean],
replace: Boolean): Unit = {
val dataType = boundedStream.getTransformation.getOutputType
val (fieldNames, fieldIndexes) = getFieldInfo(dataType)
val fieldTypes = TableEnvironment.getFieldTypes(dataType)
val relDataType = getTypeFactory.buildRelDataType(fieldNames, fieldTypes, fieldNullables)
val boundedStreamTable = new IntermediateBoundedStreamTable[T](
relDataType, boundedStream, fieldIndexes, fieldNames)
registerTableInternal(name, boundedStreamTable, replace)
}
/**
* Registers a [[DataStream]] as a table under a given name with field names as specified by
* field expressions in the [[TableEnvironment]]'s catalog.
*
* @param name The name under which the table is registered in the catalog.
* @param boundedStream The [[DataStream]] to register as table in the catalog.
* @param fields The field expressions to define the field names of the table.
* @tparam T The type of the [[DataStream]].
*/
protected def registerBoundedStreamInternal[T](
name: String, boundedStream: DataStream[T],
fields: Array[Expression], replace: Boolean): Unit = {
if (fields.exists(_.isInstanceOf[TimeAttribute])) {
throw new ValidationException(
".rowtime and .proctime time indicators are not allowed in a batch exec environment.")
}
val dataType = boundedStream.getTransformation.getOutputType
val (fieldNames, fieldIndexes) = getFieldInfo[T](dataType, fields)
val boundedStreamTable = new DataStreamTable[T](boundedStream, fieldIndexes, fieldNames)
registerTableInternal(name, boundedStreamTable, replace)
}
/**
* Registers a [[DataStream]] as a table under a given name with field names as specified by
* field expressions in the [[TableEnvironment]]'s catalog.
*
* @param name The name under which the table is registered in the catalog.
* @param boundedStream The [[DataStream]] to register as table in the catalog.
* @param fields The field expressions to define the field names of the table.
* @param fieldNullables The field isNullables attributes of boundedStream.
* @tparam T The type of the [[DataStream]].
*/
protected def registerBoundedStreamInternal[T](
name: String,
boundedStream: DataStream[T],
fields: Array[Expression],
fieldNullables: Array[Boolean],
replace: Boolean): Unit = {
if (fields.exists(_.isInstanceOf[TimeAttribute])) {
throw new ValidationException(
".rowtime and .proctime time indicators are not allowed in a batch exec environment.")
}
val dataType = boundedStream.getTransformation.getOutputType
val (fieldNames, fieldIndexes) = getFieldInfo(dataType, fields)
val physicalFieldTypes = TableEnvironment.getFieldTypes(dataType)
val fieldTypes = fieldIndexes.map(physicalFieldTypes.apply)
val relDataType = getTypeFactory.buildRelDataType(fieldNames, fieldTypes, fieldNullables)
val boundedStreamTable = new IntermediateBoundedStreamTable[T](
relDataType, boundedStream, fieldIndexes, fieldNames)
registerTableInternal(name, boundedStreamTable, replace)
}
private def translate[A](
table: Table,
sink: TableSink[A],
sinkName: String,
dagOptimizeEnabled: Boolean = false): StreamTransformation[_] = {
val sinkNode = SinkNode(table.logicalPlan, sink, sinkName)
val nodeDag = optimizeAndTranslateNodeDag(dagOptimizeEnabled, sinkNode)
nodeDag.head match {
case batchExecSink: BatchExecSink[A] => translate(batchExecSink, sink.getOutputType)
case _ => throw new TableException(TableErrors.INST.sqlCompileSinkNodeRequired())
}
}
/**
* Translates a [[ExecNode]] DAG into a [[StreamTransformation]] DAG.
*
* @param sinks The node DAG to translate.
* @return The [[StreamTransformation]] DAG that corresponds to the node DAG.
*/
override protected def translate(sinks: Seq[ExecNode[_, _]]): Seq[StreamTransformation[_]] = {
sinks.map {
case n: BatchExecSink[_] => translate(n, n.sink.getOutputType)
case _ => throw new TableException(TableErrors.INST.sqlCompileSinkNodeRequired())
}
}
/**
* Translates a [[BatchExecNode]] plan into a [[StreamTransformation]].
* Converts to target type if necessary.
*
* @param node The plan to translate.
* @param resultType The [[DataType]] of the elements that result from resulting
* [[StreamTransformation]].
* @return The [[StreamTransformation]] that corresponds to the given node.
*/
private def translate[OUT](
node: ExecNode[_, OUT],
resultType: DataType): StreamTransformation[OUT] = {
TableEnvironment.validateType(resultType)
node match {
case node: BatchExecNode[OUT] => node.translateToPlan(this)
case _ =>
throw new TableException("Cannot generate BoundedStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
}
/**
* Generates the optimized [[RelNode]] tree from the original relational node tree.
*
* @param relNode The original [[RelNode]] tree
* @return The optimized [[RelNode]] tree
*/
private[flink] def optimize(relNode: RelNode): RelNode = {
val programs = config.getCalciteConfig.getBatchPrograms
.getOrElse(FlinkBatchPrograms.buildPrograms(config.getConf))
Preconditions.checkNotNull(programs)
val optimizedPlan = programs.optimize(relNode, new BatchOptimizeContext {
override def getContext: Context = getFrameworkConfig.getContext
override def getRelOptPlanner: RelOptPlanner = getPlanner
})
// Rewrite same rel object to different rel objects
// in order to get the correct dag (dag reuse is based on object not digest)
optimizedPlan.accept(new SameRelObjectShuttle())
}
/**
* Translates a [[Table]] into a [[DataStream]].
*
* The transformation involves optimizing the relational expression tree as defined by
* Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
*
* @param table The root node of the relational expression tree.
* @param resultType The [[DataType]] of the resulting [[DataStream]].
* @tparam T The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
protected def translateToDataStream[T](
table: Table,
resultType: DataType): DataStream[T] = {
val sink = new DataStreamTableSink[T](table, resultType, false, false)
val sinkName = createUniqueTableName()
val sinkNode = LogicalSink.create(table.getRelNode, sink, sinkName)
val optimizedPlan = optimize(sinkNode)
val optimizedNodes = translateNodeDag(Seq(optimizedPlan))
require(optimizedNodes.size() == 1)
val transformation = translate(optimizedNodes.head, resultType)
new DataStream(execEnv, transformation).asInstanceOf[DataStream[T]]
}
/**
* Convert [[BatchPhysicalRel]] DAG to [[BatchExecNode]] DAG and translate them.
*/
@VisibleForTesting
private[flink] def translateNodeDag(rels: Seq[RelNode]): Seq[BatchExecNode[_]] = {
require(rels.nonEmpty && rels.forall(_.isInstanceOf[BatchPhysicalRel]))
// reuse subplan
val reusedPlan = SubplanReuseUtil.reuseSubplan(rels, config)
// convert BatchPhysicalRel DAG to BatchExecNode DAG
val nodeDag = reusedPlan.map(_.asInstanceOf[BatchExecNode[_]])
// breakup deadlock
// TODO move DeadlockBreakupProcessor into batch DAGProcessors
val nodeDagWithoutDeadlock = new DeadlockBreakupProcessor().process(
nodeDag, new DAGProcessContext(this))
// build running units
nodeDagWithoutDeadlock.foreach(n => ruKeeper.buildRUs(n.asInstanceOf[BatchExecNode[_]]))
// call processors
val dagProcessors = getConfig.getBatchDAGProcessors
require(dagProcessors != null)
val postNodeDag = dagProcessors.process(
nodeDagWithoutDeadlock, new DAGProcessContext(this, ruKeeper.getRunningUnitMap))
dumpOptimizedPlanIfNeed(postNodeDag)
postNodeDag.map(_.asInstanceOf[BatchExecNode[_]])
}
/**
* Optimize the RelNode tree (or DAG), and translate the result to ExecNode tree (or DAG).
*/
@VisibleForTesting
private[flink] override def optimizeAndTranslateNodeDag(
dagOptimizeEnabled: Boolean,
logicalNodes: LogicalNode*): Seq[ExecNode[_, _]] = {
if (logicalNodes.isEmpty) {
throw new TableException(TableErrors.INST.sqlCompileNoSinkTblError())
}
val nodeDag = if (dagOptimizeEnabled) {
val optLogicalNodes = tableServiceManager.cachePlanBuilder.buildPlanIfNeeded(logicalNodes)
// optimize dag
val optRelNodes = BatchDAGOptimizer.optimize(optLogicalNodes, this)
// translate node dag
translateNodeDag(optRelNodes)
} else {
require(logicalNodes.size == 1)
val sinkTable = new Table(this, logicalNodes.head)
val originTree = sinkTable.getRelNode
// optimize tree
val optimizedTree = optimize(originTree)
// translate node tree
translateNodeDag(Seq(optimizedTree))
}
require(nodeDag.size() == logicalNodes.size)
nodeDag
}
/**
* Registers an internal [[BatchTableSource]] in this [[TableEnvironment]]'s catalog without
* name checking. Registered tables can be referenced in SQL queries.
*
* @param name The name under which the [[TableSource]] is registered.
* @param tableSource The [[TableSource]] to register.
* @param replace Whether to replace the registered table.
*/
override protected def registerTableSourceInternal(
name: String,
tableSource: TableSource,
statistic: FlinkStatistic,
replace: Boolean = false): Unit = {
tableSource match {
// check for proper batch table source
case batchTableSource: BatchTableSource[_] =>
// check if a table (source or sink) is registered
getTable(name) match {
// table source and/or sink is registered
case Some(table: TableSourceSinkTable[_]) => table.tableSourceTable match {
// wrapper contains source
case Some(_: TableSourceTable) if !replace =>
throw new TableException(s"Table '$name' already exists. " +
s"Please choose a different name.")
// wrapper contains only sink (not source)
case _ =>
val enrichedTable = new TableSourceSinkTable(
Some(new BatchTableSourceTable(batchTableSource, statistic)),
table.tableSinkTable)
replaceRegisteredTable(name, enrichedTable)
}
// no table is registered
case _ =>
val newTable = new TableSourceSinkTable(
Some(new BatchTableSourceTable(batchTableSource, statistic)),
None)
registerTableInternal(name, newTable)
}
// not a batch table source
case _ =>
throw new TableException("Only BatchTableSource can be " +
"registered in BatchTableEnvironment.")
}
}
/**
* Creates a table source and/or table sink from a descriptor.
*
* Descriptors allow for declaring the communication to external systems in an
* implementation-agnostic way. The classpath is scanned for suitable table factories that match
* the desired configuration.
*
* The following example shows how to read from a connector using a JSON format and
* registering a table source as "MyTable":
*
* {{{
*
* tableEnv
* .connect(
* new ExternalSystemXYZ()
* .version("0.11"))
* .withFormat(
* new Json()
* .jsonSchema("{...}")
* .failOnMissingField(false))
* .withSchema(
* new Schema()
* .field("user-name", "VARCHAR").from("u_name")
* .field("count", "DECIMAL")
* .registerSource("MyTable")
* }}}
*
* @param connectorDescriptor connector descriptor describing the external system
*/
def connect(connectorDescriptor: ConnectorDescriptor): BatchTableDescriptor = {
new BatchTableDescriptor(this, connectorDescriptor)
}
override def registerTableSinkInternal(
name: String,
fieldNames: Array[String],
fieldTypes: Array[DataType],
tableSink: TableSink[_],
replace: Boolean): Unit = {
checkValidTableName(name)
if (fieldNames == null) throw new TableException("fieldNames must not be null.")
if (fieldTypes == null) throw new TableException("fieldTypes must not be null.")
if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.")
if (fieldNames.length != fieldTypes.length) {
throw new TableException("Same number of field names and types required.")
}
// configure and register
val configuredSink = tableSink.configure(fieldNames, fieldTypes)
registerTableSinkInternal(name, configuredSink, replace)
}
protected def registerTableSinkInternal(
name: String,
configuredSink: TableSink[_],
replace: Boolean): Unit = {
// validate
checkValidTableName(name)
if (configuredSink.getFieldNames == null || configuredSink.getFieldTypes == null) {
throw new TableException("Table sink is not configured.")
}
if (configuredSink.getFieldNames.length == 0) {
throw new TableException("Field names must not be empty.")
}
if (configuredSink.getFieldNames.length != configuredSink.getFieldTypes.length) {
throw new TableException("Same number of field names and types required.")
}
// register
configuredSink match {
// check for proper batch table sink
case _: BatchTableSink[_] | _: BatchCompatibleStreamTableSink[_] =>
// check if a table (source or sink) is registered
getTable(name) match {
// table source and/or sink is registered
case Some(table: TableSourceSinkTable[_]) => table.tableSinkTable match {
// wrapper contains sink
case Some(_: TableSinkTable[_]) if !replace =>
throw new TableException(s"Table '$name' already exists. " +
s"Please choose a different name.")
// wrapper contains only source (not sink)
case _ =>
val enrichedTable = new TableSourceSinkTable(
table.tableSourceTable,
Some(new TableSinkTable(configuredSink)))
replaceRegisteredTable(name, enrichedTable)
}
// no table is registered
case _ =>
val newTable = new TableSourceSinkTable(
None,
Some(new TableSinkTable(configuredSink)))
registerTableInternal(name, newTable)
}
// not a batch table sink
case _ =>
throw new TableException("Only BatchTableSink can be registered in BatchTableEnvironment.")
}
}
/**
* Merge global job parameters and table config parameters,
* and set the merged result to GlobalJobParameters
*/
private def mergeParameters(): Unit = {
if (streamEnv != null && streamEnv.getConfig != null) {
val parameters = new Configuration()
if (config != null && config.getConf != null) {
parameters.addAll(config.getConf)
}
if (streamEnv.getConfig.getGlobalJobParameters != null) {
streamEnv.getConfig.getGlobalJobParameters.toMap.asScala.foreach {
kv => parameters.setString(kv._1, kv._2)
}
}
streamEnv.getConfig.setGlobalJobParameters(parameters)
}
}
/**
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of the given [[Table]].
*
* @param table The table for which the AST and execution plan will be returned.
* @param extended Flag to include detailed optimizer estimates.
*/
private[flink] def explain(table: Table, extended: Boolean): String = {
val ast = table.getRelNode
// explain as simple tree, ignore dag optimization if it's enabled
val optimizedNode = optimizeAndTranslateNodeDag(false, table.logicalPlan).head
val fieldTypes = ast.getRowType.getFieldList.asScala
.map(field => FlinkTypeFactory.toInternalType(field.getType))
val transformation = translate(optimizedNode, new RowType(fieldTypes: _*))
val streamGraph = translateStreamGraph(ArrayBuffer(transformation), None)
val executionPlan = PlanUtil.explainPlan(streamGraph)
val (explainLevel, withResource, withMemCost) = if (extended) {
(SqlExplainLevel.ALL_ATTRIBUTES, true, true)
} else {
(SqlExplainLevel.EXPPLAN_ATTRIBUTES, false, false)
}
s"== Abstract Syntax Tree ==" +
System.lineSeparator +
s"${FlinkRelOptUtil.toString(ast)}" +
System.lineSeparator +
s"== Optimized Logical Plan ==" +
System.lineSeparator +
s"${FlinkNodeOptUtil.treeToString(
optimizedNode, explainLevel, withResource, withMemCost)}" +
System.lineSeparator +
s"== Physical Execution Plan ==" +
System.lineSeparator +
s"$executionPlan"
}
def explain(table: Table): String = explain(table: Table, extended = false)
def explain(extended: Boolean = false): String = {
if (!config.getSubsectionOptimization) {
throw new TableException("Can not explain due to subsection optimization is not supported, " +
"please check your TableConfig.")
}
val sinkExecNodes = optimizeAndTranslateNodeDag(true, sinkNodes: _*)
val sb = new StringBuilder
sb.append("== Abstract Syntax Tree ==")
sb.append(System.lineSeparator)
sinkNodes.foreach { sink =>
val table = new Table(this, sink.children.head)
val ast = table.getRelNode
sb.append(FlinkRelOptUtil.toString(ast))
sb.append(System.lineSeparator)
}
sb.append("== Optimized Logical Plan ==")
sb.append(System.lineSeparator)
val (explainLevel, withResource, withMemCost) = if (extended) {
(SqlExplainLevel.ALL_ATTRIBUTES, true, true)
} else {
(SqlExplainLevel.EXPPLAN_ATTRIBUTES, false, false)
}
sb.append(FlinkNodeOptUtil.dagToString(sinkExecNodes, explainLevel, withResource, withMemCost))
val sinkTransformations = translate(sinkExecNodes)
val streamGraph = StreamGraphGenerator.generate(
StreamGraphGenerator.Context.buildBatchProperties(streamEnv), sinkTransformations)
val sqlPlan = PlanUtil.explainPlan(streamGraph)
sb.append("== Physical Execution Plan ==")
sb.append(System.lineSeparator)
sb.append(sqlPlan)
sb.toString()
}
/**
* Dump stream graph plan with accumulated operator metrics if config enabled.
*
* @param streamGraph streamGraph
* @param jobResult job result of stream graph
*/
private[this] def dumpPlanWithMetricsIfNeed(
streamGraph: StreamGraph,
jobResult: JobExecutionResult): Unit = {
val dumpFilePath = config.getConf.getString(
TableConfigOptions.SQL_EXEC_OPERATOR_METRIC_DUMP_PATH)
if (config.getConf.getBoolean(TableConfigOptions.SQL_EXEC_OPERATOR_METRIC_DUMP_ENABLED)
&& dumpFilePath != null) {
streamGraph.dumpPlanWithMetrics(dumpFilePath, jobResult)
}
}
/**
* Dump optimized plan if config enabled.
*
* @param optimizedNodes optimized plan
*/
private[this] def dumpOptimizedPlanIfNeed(optimizedNodes: Seq[ExecNode[_, _]]): Unit = {
val dumpFilePath = config.getConf.getString(TableConfigOptions.SQL_OPTIMIZER_PLAN_DUMP_PATH)
val planDump = config.getConf.getBoolean(TableConfigOptions.SQL_OPTIMIZER_PLAN_DUMP_ENABLED)
if (planDump && dumpFilePath != null) {
dumpExecNodes(optimizedNodes, dumpFilePath)
}
}
}