| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.table.api |
| |
| import org.apache.flink.annotation.{Experimental, Internal, VisibleForTesting} |
| import org.apache.flink.api.common.JobExecutionResult |
| import org.apache.flink.api.common.time.Time |
| import org.apache.flink.api.common.typeinfo.TypeInformation |
| import org.apache.flink.api.java.typeutils._ |
| import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv} |
| import org.apache.flink.streaming.api.graph.StreamGraph |
| import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} |
| import org.apache.flink.streaming.api.transformations.StreamTransformation |
| import org.apache.flink.table.api.functions.{AggregateFunction, ScalarFunction, TableFunction} |
| import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnvironment, StreamTableEnvironment => JavaStreamTableEnv} |
| import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnvironment, StreamTableEnvironment => ScalaStreamTableEnv, _} |
| import org.apache.flink.table.api.types._ |
| import org.apache.flink.table.calcite._ |
| import org.apache.flink.table.catalog._ |
| import org.apache.flink.table.codegen._ |
| import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableDescriptor} |
| import org.apache.flink.table.errorcode.TableErrors |
| import org.apache.flink.table.expressions.{Alias, Expression, TimeAttribute, UnresolvedFieldReference} |
| import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils |
| import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ |
| import org.apache.flink.table.plan.cost.FlinkCostFactory |
| import org.apache.flink.table.plan.logical.{CatalogNode, LogicalNode, LogicalRelNode, SinkNode} |
| import org.apache.flink.table.plan.nodes.exec.ExecNode |
| import org.apache.flink.table.plan.schema._ |
| import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats} |
| import org.apache.flink.table.sinks._ |
| import org.apache.flink.table.sources.TableSource |
| import org.apache.flink.table.temptable.FlinkTableServiceManager |
| import org.apache.flink.table.typeutils.TypeUtils |
| import org.apache.flink.table.validate.{BuiltInFunctionCatalog, ChainedFunctionCatalog, FunctionCatalog} |
| |
| import org.apache.calcite.config.Lex |
| import org.apache.calcite.plan.{Contexts, RelOptPlanner} |
| import org.apache.calcite.rel.logical.LogicalTableModify |
| import org.apache.calcite.schema |
| import org.apache.calcite.schema.SchemaPlus |
| import org.apache.calcite.schema.impl.AbstractTable |
| import org.apache.calcite.sql.parser.SqlParser |
| import org.apache.calcite.sql.util.ChainedSqlOperatorTable |
| import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlOperatorTable, _} |
| import org.apache.calcite.sql2rel.SqlToRelConverter |
| import org.apache.calcite.tools._ |
| import org.apache.commons.lang3.StringUtils |
| |
| import _root_.java.lang.reflect.Modifier |
| import _root_.java.util |
| import _root_.java.util.concurrent.atomic.AtomicInteger |
| import _root_.java.util.concurrent.atomic.AtomicBoolean |
| |
| import _root_.scala.annotation.varargs |
| import _root_.scala.collection.JavaConversions._ |
| import _root_.scala.collection.JavaConverters._ |
| import _root_.scala.collection.mutable |
| import _root_.scala.collection.mutable.ArrayBuffer |
| |
| /** |
| * The abstract base class for batch and stream TableEnvironments. |
| * |
| * @param config The configuration of the TableEnvironment |
| */ |
| abstract class TableEnvironment( |
| private[flink] val execEnv: JavaStreamExecEnv, |
| val config: TableConfig) extends AutoCloseable { |
| |
| protected val DEFAULT_JOB_NAME = "Flink Exec Table Job" |
| |
| protected val catalogManager: CatalogManager = new CatalogManager() |
| private val currentSchema: SchemaPlus = catalogManager.getRootSchema |
| |
| private val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(new FlinkTypeSystem) |
| |
| // Table API/SQL function catalog (built in, does not contain external functions) |
| private val functionCatalog: FunctionCatalog = BuiltInFunctionCatalog.withBuiltIns() |
| |
| // Table API/SQL function catalog built in function catalog. |
| private[flink] lazy val chainedFunctionCatalog: FunctionCatalog = |
| new ChainedFunctionCatalog(Seq(functionCatalog)) |
| |
| // the configuration to create a Calcite planner |
| protected var frameworkConfig: FrameworkConfig = createFrameworkConfig |
| |
| // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree. |
| protected var relBuilder: FlinkRelBuilder = createRelBuilder |
| |
| // the planner instance used to optimize queries of this TableEnvironment |
| private var planner: RelOptPlanner = createRelOptPlanner |
| |
| // reuse flink planner |
| private var flinkPlanner: FlinkPlannerImpl = createFlinkPlanner |
| |
| // a counter for unique attribute names |
| private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0) |
| |
| // a counter for unique table names |
| private[flink] val tableNameCntr: AtomicInteger = new AtomicInteger(0) |
| |
| private[flink] val tableNamePrefix = "_TempTable_" |
| |
| // sink nodes collection |
| private[flink] val sinkNodes = new mutable.MutableList[SinkNode] |
| |
| private[flink] val transformations = new ArrayBuffer[StreamTransformation[_]] |
| |
| protected var userClassloader: ClassLoader = null |
| |
| // a manager for table service |
| private[flink] val tableServiceManager = new FlinkTableServiceManager(this) |
| |
| private val closed: AtomicBoolean = new AtomicBoolean(false) |
| |
| // the configuration for SqlToRelConverter |
| private[flink] lazy val sqlToRelConverterConfig: SqlToRelConverter.Config = { |
| val calciteConfig = config.getCalciteConfig |
| calciteConfig.getSqlToRelConverterConfig match { |
| case Some(c) => c |
| case None => getSqlToRelConverterConfig |
| } |
| } |
| |
| /** Returns the table config to define the runtime behavior of the Table API. */ |
| def getConfig: TableConfig = config |
| |
| /** Returns the [[QueryConfig]] depends on the concrete type of this TableEnvironment. */ |
| private[flink] def queryConfig: QueryConfig = this match { |
| case _: BatchTableEnvironment => new BatchQueryConfig |
| case _: StreamTableEnvironment => new StreamQueryConfig |
| case _ => null |
| } |
| |
| /** |
| * Compile the sink [[org.apache.flink.table.plan.logical.LogicalNode]] to |
| * [[org.apache.flink.streaming.api.transformations.StreamTransformation]]. |
| */ |
| protected def compile(): Unit = { |
| if (config.getSubsectionOptimization) { |
| // optimize rel node, and translate to node dag |
| val nodeDag = optimizeAndTranslateNodeDag(true, sinkNodes: _*) |
| // translate to transformation |
| val sinkTransformations = translate(nodeDag) |
| transformations.addAll(sinkTransformations) |
| } |
| } |
| |
| /** |
| * Optimize the RelNode tree (or DAG), and translate the result to [[ExecNode]] tree (or DAG). |
| */ |
| @VisibleForTesting |
| private[flink] def optimizeAndTranslateNodeDag( |
| dagOptimizeEnabled: Boolean, logicalNodes: LogicalNode*): Seq[ExecNode[_, _]] |
| |
| /** |
| * Translates a [[ExecNode]] DAG into a [[StreamTransformation]] DAG. |
| * |
| * @param sinks The node DAG to translate. |
| * @return The [[StreamTransformation]] DAG that corresponds to the node DAG. |
| */ |
| protected def translate(sinks: Seq[ExecNode[_, _]]): Seq[StreamTransformation[_]] |
| |
| /** |
| * Generate a [[StreamGraph]] from this table environment, this will also |
| * clear [[LogicalNode]]s. |
| * @return A [[StreamGraph]] describing the whole job. |
| */ |
| def generateStreamGraph(): StreamGraph = generateStreamGraph(DEFAULT_JOB_NAME) |
| |
| /** |
| * Generate a [[StreamGraph]] from this table environment, this will also |
| * clear [[LogicalNode]]s. |
| * @return A [[StreamGraph]] describing the whole job. |
| */ |
| def generateStreamGraph(jobName: String): StreamGraph = { |
| try { |
| if (config.getSubsectionOptimization) { |
| compile() |
| } |
| if (transformations.isEmpty) { |
| throw new TableException("No table sinks have been created yet. " + |
| "A program needs at least one sink that consumes data. ") |
| } |
| translateStreamGraph(transformations, Option.apply(jobName)) |
| } finally { |
| sinkNodes.clear() |
| } |
| } |
| |
| /** |
| * Translate a [[StreamGraph]] from Given streamingTransformations. |
| * @return A [[StreamGraph]] describing the given job. |
| */ |
| protected def translateStreamGraph( |
| streamingTransformations: ArrayBuffer[StreamTransformation[_]], |
| jobName: Option[String]): StreamGraph = ??? |
| |
| /** |
| * Returns the operator table for this environment including a custom Calcite configuration. |
| */ |
| protected def getSqlOperatorTable: SqlOperatorTable = { |
| val calciteConfig = config.getCalciteConfig |
| |
| calciteConfig.getSqlOperatorTable match { |
| case None => |
| chainedFunctionCatalog.getSqlOperatorTable |
| case Some(table) => |
| if (calciteConfig.replacesSqlOperatorTable) { |
| table |
| } else { |
| ChainedSqlOperatorTable.of(chainedFunctionCatalog.getSqlOperatorTable, table) |
| } |
| } |
| } |
| |
| /** |
| * Returns the SQL parser config for this environment including a custom Calcite configuration. |
| */ |
| protected def getSqlParserConfig: SqlParser.Config = { |
| val calciteConfig = config.getCalciteConfig |
| calciteConfig.getSqlParserConfig match { |
| |
| case None => |
| // we use Java lex because back ticks are easier than double quotes in programming |
| // and cases are preserved |
| SqlParser |
| .configBuilder() |
| .setLex(Lex.JAVA) |
| .setIdentifierMaxLength(256) |
| .build() |
| |
| case Some(sqlParserConfig) => |
| sqlParserConfig |
| } |
| } |
| |
| /** |
| * Returns the SqlToRelConverter config. |
| */ |
| @Internal |
| protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = |
| SqlToRelConverter.configBuilder() |
| .withTrimUnusedFields(false) |
| .withConvertTableAccess(false) |
| .build() |
| |
| def getCatalogManager: CatalogManager = { |
| catalogManager |
| } |
| |
| /** |
| * Registers an [[ReadableCatalog]] under a unique name in the TableEnvironment's schema. |
| * All tables registered in the [[ReadableCatalog]] can be accessed. |
| * |
| * @param name The name under which the catalog will be registered |
| * @param catalog The catalog to register |
| */ |
| def registerCatalog(name: String, catalog: ReadableCatalog): Unit = { |
| registerCatalogInternal(name, catalog) |
| } |
| |
| /** |
| * Registers an [[ReadableCatalog]] under a unique name in the TableEnvironment's schema. |
| * All tables registered in the [[ReadableCatalog]] can be accessed. |
| * |
| * @param name The name under which the catalog will be registered |
| * @param catalog The catalog to register |
| * |
| */ |
| @throws[CatalogAlreadyExistException] |
| def registerCatalogInternal( |
| name: String, |
| catalog: ReadableCatalog): Unit = { |
| |
| catalogManager.registerCatalog(name, catalog) |
| } |
| |
| /** |
| * Get a registered catalog. |
| * |
| * @param catalogName |
| * @return ReadableCatalog |
| */ |
| @throws[CatalogNotExistException] |
| def getCatalog(catalogName: String): ReadableCatalog = { |
| catalogManager.getCatalog(catalogName) |
| } |
| |
| /** |
| * Get the default registered catalog. |
| * |
| * @return ReadableWritableCatalog |
| */ |
| @throws[CatalogNotExistException] |
| def getDefaultCatalog(): ReadableWritableCatalog = { |
| catalogManager.getCatalog(getDefaultCatalogName()).asInstanceOf[ReadableWritableCatalog] |
| } |
| |
| /** |
| * Get the default catalog name. |
| */ |
| def getDefaultCatalogName(): String = { |
| catalogManager.getDefaultCatalogName |
| } |
| |
| /** |
| * Get the default database name. |
| */ |
| def getDefaultDatabaseName(): String = { |
| catalogManager.getDefaultDatabaseName |
| } |
| |
| /** |
| * Set a default catalog. |
| * |
| * @param name Name of the catalog |
| */ |
| def setDefaultCatalog(name: String): Unit = { |
| catalogManager.setDefaultCatalog(name) |
| } |
| |
| /** |
| * Set a default catalog and database. |
| * |
| * @param catalogName Name of the catalog |
| * @param dbName Name of the database |
| */ |
| def setDefaultDatabase(catalogName: String, dbName: String): Unit = { |
| catalogManager.setDefaultDatabase(catalogName, dbName) |
| } |
| |
| /** |
| * Set the default database. If a catalog is not specified, the database is resolved relative |
| * to the current default catalog. |
| * Note! This method does not support setting default catalog only. |
| * |
| * @param dbPath Name or path of the database |
| */ |
| @varargs |
| def setDefaultDatabase(dbPath: String*): Unit = { |
| if (dbPath(0) == null || dbPath.length < 1 || dbPath.length > 2 || dbPath(0).isEmpty || |
| (dbPath.length == 2 && dbPath(1).isEmpty)) { |
| throw new IllegalArgumentException(String.format("Invalid database path %s", dbPath)) |
| } |
| |
| val catalogName = if (dbPath.length == 1) getDefaultCatalogName() else dbPath(0) |
| val dbName = if (dbPath.length == 1) dbPath(0) else dbPath(1) |
| |
| catalogManager.setDefaultDatabase(catalogName, dbName) |
| } |
| |
| /** |
| * Get a table catalogs. |
| * |
| * @param name The name of the table. |
| * @return The table registered either internally or externally, None otherwise. |
| */ |
| def getTable(name: String): Option[org.apache.calcite.schema.Table] = { |
| getTable(name.split('.')) |
| } |
| |
| /** |
| * Get a table from catalogs. |
| * |
| * @param paths The paths of the table. |
| * @return The table registered either internally or externally, None otherwise. |
| */ |
| def getTable(paths: Array[String]): Option[org.apache.calcite.schema.Table] = { |
| val names = catalogManager.resolveTableName(paths : _*) |
| val catalogName = names(0) |
| val dbName = names(1) |
| val tableName = names(2) |
| |
| val catalogSchema = catalogManager.getRootSchema.getSubSchema(catalogName) |
| |
| if (catalogSchema == null) { |
| Option.empty |
| } else { |
| val dbSchema = catalogSchema.getSubSchema(dbName) |
| |
| if (dbSchema == null) { |
| Option.empty |
| } else { |
| Option(dbSchema.getTable(tableName)) |
| } |
| } |
| } |
| |
| /** |
| * Registers a [[ScalarFunction]] under a unique name. Replaces already existing |
| * user-defined functions under this name. |
| */ |
| def registerFunction(name: String, function: ScalarFunction): Unit = { |
| // check if class could be instantiated |
| checkForInstantiation(function.getClass) |
| |
| // register in Table API |
| functionCatalog.registerFunction(name, function.getClass) |
| |
| // register in SQL API |
| functionCatalog.registerSqlFunction( |
| createScalarSqlFunction(name, name, function, typeFactory) |
| ) |
| } |
| |
| /** |
| * Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. |
| * Registered functions can be referenced in Table API and SQL queries. |
| * |
| * @param name The name under which the function is registered. |
| * @param f The AggregateFunction to register. |
| * @tparam T The type of the output value. |
| * @tparam ACC The type of aggregate accumulator. |
| */ |
| def registerFunction[T, ACC]( |
| name: String, |
| f: AggregateFunction[T, ACC]) |
| : Unit = { |
| implicit val typeInfo: TypeInformation[T] = TypeExtractor |
| .createTypeInfo(f, classOf[AggregateFunction[T, ACC]], f.getClass, 0) |
| .asInstanceOf[TypeInformation[T]] |
| |
| implicit val accTypeInfo: TypeInformation[ACC] = TypeExtractor |
| .createTypeInfo(f, classOf[AggregateFunction[T, ACC]], f.getClass, 1) |
| .asInstanceOf[TypeInformation[ACC]] |
| |
| registerAggregateFunctionInternal[T, ACC](name, f) |
| } |
| |
| /** |
| * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog. |
| * Registered functions can be referenced in Table API and SQL queries. |
| * |
| * @param name The name under which the function is registered. |
| * @param tf The TableFunction to register. |
| * @tparam T The type of the output row. |
| */ |
| def registerFunction[T](name: String, tf: TableFunction[T]): Unit = { |
| implicit val typeInfo: TypeInformation[T] = |
| UserDefinedFunctionUtils.getImplicitResultTypeInfo(tf) |
| registerTableFunctionInternal(name, tf) |
| } |
| |
| /** |
| * Registers a [[TableFunction]] under a unique name. Replaces already existing |
| * user-defined functions under this name. |
| */ |
| private[flink] def registerTableFunctionInternal[T: TypeInformation]( |
| name: String, function: TableFunction[T]): Unit = { |
| // check if class not Scala object |
| checkNotSingleton(function.getClass) |
| // check if class could be instantiated |
| checkForInstantiation(function.getClass) |
| val implicitResultType: DataType = |
| // we may use arguments types to infer later on. |
| if (UserDefinedFunctionUtils.getResultTypeIgnoreException(function) != null) { |
| function.getResultType(null, null) |
| } else { |
| implicitly[TypeInformation[T]] |
| } |
| |
| // register in Table API |
| functionCatalog.registerFunction(name, function.getClass) |
| |
| // register in SQL API |
| val sqlFunctions = |
| createTableSqlFunction(name, name, function, implicitResultType, typeFactory) |
| functionCatalog.registerSqlFunction(sqlFunctions) |
| } |
| |
| /** |
| * Registers an [[AggregateFunction]] under a unique name. Replaces already existing |
| * user-defined functions under this name. |
| */ |
| private[flink] def registerAggregateFunctionInternal[T: TypeInformation, ACC: TypeInformation]( |
| name: String, |
| function: AggregateFunction[T, ACC]): Unit = { |
| // check if class not Scala object |
| checkNotSingleton(function.getClass) |
| // check if class could be instantiated |
| checkForInstantiation(function.getClass) |
| |
| val resultType = getResultTypeOfAggregateFunction(function, implicitly[TypeInformation[T]]) |
| val accType = getAccumulatorTypeOfAggregateFunction(function, implicitly[TypeInformation[ACC]]) |
| |
| // register in Table API |
| functionCatalog.registerFunction(name, function.getClass) |
| |
| // register in SQL API |
| val sqlFunctions = createAggregateSqlFunction( |
| name, |
| name, |
| function, |
| resultType, |
| accType, |
| typeFactory) |
| |
| functionCatalog.registerSqlFunction(sqlFunctions) |
| } |
| |
| /** |
| * Registers a [[Table]] under a unique name in the TableEnvironment's catalog. |
| * Registered tables can be referenced in SQL queries. |
| * |
| * @param name The name under which the table will be registered. |
| * @param table The table to register. |
| */ |
| def registerTable(name: String, table: Table): Unit = { |
| // check that table belongs to this table environment |
| if (table.tableEnv != this) { |
| throw new TableException( |
| "Only tables that belong to this TableEnvironment can be registered.") |
| } |
| |
| checkValidTableName(name) |
| val tableTable = new RelTable(table.getRelNode) |
| registerTableInternal(name, tableTable, replace = false) |
| } |
| |
| /** |
| * Registers a [[CatalogTable]] under a unique name in the TableEnvironment's default catalog. |
| * Registered tables can be referenced in SQL queries. |
| * |
| * @param name The name under which the table will be registered. |
| * @param catalogTable The table to register. |
| */ |
| def registerTable(name: String, catalogTable: CatalogTable): Unit = { |
| val catalog = getDefaultCatalog() |
| val path = new ObjectPath(getDefaultDatabaseName(), name) |
| catalog.createTable(path, catalogTable, false) |
| } |
| |
| /** |
| * Registers or replace a [[Table]] under a unique name in the TableEnvironment's catalog. |
| * Registered tables can be referenced in SQL queries. |
| * |
| * @param name The name under which the table will be registered. |
| * @param table The table to register. |
| */ |
| def registerOrReplaceTable(name: String, table: Table): Unit = { |
| // check that table belongs to this table environment |
| if (table.tableEnv != this) { |
| throw new TableException( |
| "Only tables that belong to this TableEnvironment can be registered.") |
| } |
| |
| checkValidTableName(name) |
| val tableTable = new RelTable(table.getRelNode) |
| registerTableInternal(name, tableTable, replace = true) |
| } |
| |
| /** |
| * Registers or replace a [[CatalogTable]] under a unique name in TableEnvironment's default |
| * catalog. Registered tables can be referenced in SQL queries. |
| * |
| * @param name The name under which the table will be registered. |
| * @param catalogTable The table to register. |
| */ |
| def registerOrReplaceTable(name: String, catalogTable: CatalogTable): Unit = { |
| val catalog = getDefaultCatalog() |
| val path = new ObjectPath(getDefaultDatabaseName(),name) |
| |
| catalog.dropTable(path, true) |
| catalog.createTable(path, catalogTable, false) |
| } |
| |
| /** |
| * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog. |
| * |
| * @param name The name under which the table will be registered. |
| * @param table The table to register in the catalog |
| * @throws TableAlreadyExistException if another table is registered under the provided name. |
| */ |
| @throws[TableAlreadyExistException] |
| private[flink] def registerTableInternal(name: String, table: AbstractTable): Unit = { |
| catalogManager.getCatalog(catalogManager.getDefaultCatalogName) |
| .asInstanceOf[ReadableWritableCatalog] |
| .createTable( |
| new ObjectPath(catalogManager.getDefaultDatabaseName, name), |
| createFlinkTempTable(table), |
| false |
| ) |
| } |
| |
| /** |
| * Replaces a registered Table with another Table under the same name. |
| * We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]] |
| * with a [[org.apache.calcite.schema.TranslatableTable]]. |
| * |
| * @param name Name of the table to replace. |
| * @param table The table that replaces the previous table. |
| */ |
| protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = { |
| catalogManager.getCatalog(catalogManager.getDefaultCatalogName) |
| .asInstanceOf[ReadableWritableCatalog] |
| .alterTable( |
| new ObjectPath(catalogManager.getDefaultDatabaseName, name), |
| createFlinkTempTable(table), |
| false |
| ) |
| } |
| |
| private def createFlinkTempTable(table: AbstractTable): FlinkTempTable = { |
| val currentMillis = System.currentTimeMillis() |
| |
| return new FlinkTempTable( |
| table, |
| null, |
| null, |
| new util.HashMap[String, String](), |
| null, |
| null, |
| null, |
| new util.LinkedHashSet[String](), |
| false, |
| null, |
| null, |
| -1, |
| currentMillis, |
| currentMillis |
| ) |
| } |
| |
| /** |
| * Registers an external [[TableSource]] in this [[TableEnvironment]]'s catalog. |
| * Registered tables can be referenced in SQL queries. |
| * |
| * @param name The name under which the [[TableSource]] is registered. |
| * @param tableSource The [[TableSource]] to register. |
| */ |
| def registerTableSource(name: String, tableSource: TableSource): Unit = { |
| checkValidTableName(name) |
| registerTableSourceInternal(name, tableSource, FlinkStatistic.UNKNOWN, false) |
| } |
| |
| /** |
| * Registers or replace an external [[TableSource]] in this [[TableEnvironment]]'s catalog. |
| * Registered tables can be referenced in SQL queries. |
| * |
| * @param name The name under which the [[TableSource]] is registered. |
| * @param tableSource The [[TableSource]] to register. |
| */ |
| def registerOrReplaceTableSource(name: String, |
| tableSource: TableSource): Unit = { |
| checkValidTableName(name) |
| registerTableSourceInternal(name, tableSource, FlinkStatistic.UNKNOWN, true) |
| } |
| |
| /** |
| * Registers an internal [[TableSource]] in this [[TableEnvironment]]'s catalog without |
| * name checking. Registered tables can be referenced in SQL queries. |
| * |
| * @param name The name under which the [[TableSource]] is registered. |
| * @param tableSource The [[TableSource]] to register. |
| * @param replace Whether to replace this [[TableSource]] |
| */ |
| protected def registerTableSourceInternal(name: String, |
| tableSource: TableSource, |
| tableStats: FlinkStatistic, |
| replace: Boolean): Unit |
| |
| /** |
| * Gets the statistics of a table. |
| * Note: this function returns current statistics of the table directly, does not trigger |
| * statistics gather operation. |
| * |
| * @param tableName The table name under which the table is registered in [[TableEnvironment]]. |
| * tableName must be a single name(e.g. "MyTable") associated with a table. |
| * @return Statistics of a table if the statistics is available, else return null. |
| */ |
| @Experimental |
| def getTableStats(tableName: String): TableStats = { |
| require(tableName != null && tableName.nonEmpty, "tableName must not be null or empty.") |
| getTableStats(Array(tableName)) |
| } |
| |
| /** |
| * Gets the statistics of a table. |
| * Note: this function returns current statistics of the table directly, does not trigger |
| * statistics gather operation. |
| * |
| * @param tablePath The table name under which the table is registered in [[TableEnvironment]]. |
| * tablePath can be a single name(e.g. Array("MyTable")) associated with a |
| * table , or can be a nest names (e.g. Array("MyCatalog", "MyDb", "MyTable")) |
| * associated with a table registered as member of a [[ReadableCatalog]]. |
| * @return Statistics of a table if the statistics is available, else return null. |
| */ |
| @Experimental |
| def getTableStats(tablePath: Array[String]): TableStats = { |
| val tableOpt = getTable(tablePath) |
| if (tableOpt.isEmpty) { |
| throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.") |
| } |
| val table = tableOpt.get |
| val tableName = tablePath.last |
| val stats = if (tablePath.length == 1) { |
| table match { |
| case t: FlinkTable => |
| // call statistic instead of getStatistics of FlinkTable to fetch the original statistics. |
| val statistics = t.getStatistic |
| if (statistics == null) { |
| None |
| } else { |
| Option(statistics.getTableStats) |
| } |
| // Only source table extends FlinkTable now |
| case sourceSinkTable: TableSourceSinkTable[_] if sourceSinkTable.isSourceTable => |
| val statistics = sourceSinkTable.tableSourceTable.get.getStatistic |
| if (statistics == null) { |
| None |
| } else { |
| Option(statistics.getTableStats) |
| } |
| case _ => None |
| } |
| } else { |
| // table in catalogs |
| val path = catalogManager.resolveTableName(tablePath.toList) |
| val catalog = getCatalog(path(0)) |
| |
| Option(catalog.getTable(new ObjectPath(path(1), path(2))).getTableStats) |
| } |
| stats.orNull |
| } |
| |
| /** |
| * Alters the statistics of a table. |
| * |
| * @param tablePath The table name under which the table is registered in [[TableEnvironment]]. |
| * tablePath can be a single name(e.g. Array("MyTable")) associated with a |
| * table , or can be a nest names (e.g. Array("MyCatalog", "MyDb", "MyTable")) |
| * associated with a table registered as member of a [[ReadableCatalog]]. |
| * @param tableStats The [[TableStats]] to update. |
| */ |
| def alterTableStats(tablePath: Array[String], tableStats: TableStats): Unit = { |
| alterTableStats(tablePath, Option(tableStats)) |
| } |
| |
| /** |
| * Alters the statistics of a table. |
| * |
| * @param tableName The table name under which the table is registered in [[TableEnvironment]]. |
| * tableName must be a single name(e.g. "MyTable") associated with a table. |
| * @param tableStats The [[TableStats]] to update. |
| */ |
| @Experimental |
| def alterTableStats(tableName: String, tableStats: Option[TableStats]): Unit = { |
| require(tableName != null && tableName.nonEmpty, "tableName must not be null or empty.") |
| alterTableStats(Array(tableName), tableStats) |
| } |
| |
| /** |
| * Alters the statistics of a table. |
| * |
| * @param tablePath The table name under which the table is registered in [[TableEnvironment]]. |
| * tablePath can be a single name(e.g. Array("MyTable")) associated with a |
| * table , or can be a nest names (e.g. Array("MyCatalog", "MyDb", "MyTable")) |
| * associated with a table registered as member of a [[ReadableCatalog]]. |
| * @param tableStats The [[TableStats]] to update. |
| */ |
| @Experimental |
| def alterTableStats(tablePath: Array[String], tableStats: Option[TableStats]): Unit = { |
| val tableOpt = getTable(tablePath) |
| if (tableOpt.isEmpty) { |
| throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.") |
| } |
| |
| val table = tableOpt.get |
| val tableName = tablePath.last |
| if (tablePath.length == 1) { |
| // table in calcite root schema |
| val statistic = table match { |
| // call statistic instead of getStatistics of TableSourceTable |
| // to fetch the original statistics. |
| case t: TableSourceSinkTable[_] if t.isSourceTable => t.tableSourceTable.get.statistic |
| case t: FlinkTable => t.getStatistic |
| case _ => throw new TableException( |
| s"alter TableStats operation is not supported for ${table.getClass}.") |
| } |
| val oldStatistic = if (statistic == null) FlinkStatistic.UNKNOWN else statistic |
| val newStatistic = FlinkStatistic.builder.statistic(oldStatistic). |
| tableStats(tableStats.orNull).build() |
| val newTable = table.asInstanceOf[FlinkTable].copy(newStatistic) |
| replaceRegisteredTable(tableName, newTable) |
| } else { |
| // table in catalogs |
| val path = catalogManager.resolveTableName(tablePath.toList) |
| val catalog = getCatalog(path(0)) |
| |
| // TODO: [BLINK-18570617] re-enable alter catalog table stats in TableEnvironment |
| // catalog match { |
| // case c: ReadableWritableCatalog => |
| // c.alterTableStats(tableName, tableStats, ignoreIfNotExists = false) |
| // case _ => throw new TableException( |
| // s"alterTableStats operation is not supported for ${catalog.getClass}.") |
| // } |
| |
| throw new TableException( |
| s"catalogs haven't supportted alterTableStats operation yet.") |
| } |
| } |
| |
| /** |
| * Alter skew info to a table, optimizer will try to choose better plan on skewed data. |
| * 1. pick the skewed values to join separately |
| * 2. prefer to choose add local-combine aggregate before global aggregate which group by |
| * skewed data |
| * |
| * TODO: Add skewInfo on a specified set of columns later. Now only support to specify skewInfo |
| * on a singleKey, it is not enough to determines whether a specified set of columns from a |
| * specified relational expression is skew or not. |
| * |
| * @param tableName table name to alter. |
| * @param skewInfo statistics of skewedColNames and skewedColValues. |
| */ |
| def alterSkewInfo( |
| tableName: String, |
| skewInfo: util.Map[String, util.List[AnyRef]]): Unit = { |
| require(tableName != null && tableName.nonEmpty, "tableName must not be null or empty.") |
| alterSkewInfo(Array(tableName), skewInfo) |
| } |
| |
| private def alterSkewInfo( |
| tablePath: Array[String], |
| skewInfo: util.Map[String, util.List[AnyRef]]): Unit = { |
| val tableOpt = getTable(tablePath) |
| if (tableOpt.isEmpty) { |
| throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.") |
| } |
| |
| val table = tableOpt.get |
| val tableName = tablePath.last |
| if (tablePath.length == 1) { |
| // table in calcite root schema |
| val statistic = table match { |
| // call statistic instead of getStatistics of TableSourceTable |
| // to fetch the original statistics. |
| case t: TableSourceSinkTable[_] if t.isSourceTable => t.tableSourceTable.get.statistic |
| case t: FlinkTable => t.getStatistic |
| case _ => throw new TableException( |
| s"alter SkewInfo operation is not supported for ${table.getClass}.") |
| } |
| val oldStatistic = if (statistic == null) FlinkStatistic.UNKNOWN else statistic |
| val newStatistic = FlinkStatistic.builder.statistic(oldStatistic).skewInfo(skewInfo).build() |
| val newTable = table.asInstanceOf[FlinkTable].copy(newStatistic) |
| replaceRegisteredTable(tableName, newTable) |
| } else { |
| throw new TableException("alterSkewInfo operation is not supported for external catalog.") |
| } |
| } |
| |
| /** |
| * Registers an external [[TableSink]] with given field names and types in this |
| * [[TableEnvironment]]'s catalog. |
| * Registered sink tables can be referenced in SQL DML statements. |
| * |
| * @param name The name under which the [[TableSink]] is registered. |
| * @param fieldNames The field names to register with the [[TableSink]]. |
| * @param fieldTypes The field types to register with the [[TableSink]]. |
| * @param tableSink The [[TableSink]] to register. |
| */ |
| def registerTableSink( |
| name: String, |
| fieldNames: Array[String], |
| fieldTypes: Array[DataType], |
| tableSink: TableSink[_]): Unit = { |
| registerTableSinkInternal(name, fieldNames, fieldTypes, tableSink, false) |
| } |
| |
| /** |
| * Registers or replace an external [[TableSink]] with given field names and types in this |
| * [[TableEnvironment]]'s catalog. |
| * Registered sink tables can be referenced in SQL DML statements. |
| * |
| * @param name The name under which the [[TableSink]] is registered. |
| * @param fieldNames The field names to register with the [[TableSink]]. |
| * @param fieldTypes The field types to register with the [[TableSink]]. |
| * @param tableSink The [[TableSink]] to register. |
| */ |
| def registerOrReplaceTableSink( |
| name: String, |
| fieldNames: Array[String], |
| fieldTypes: Array[DataType], |
| tableSink: TableSink[_]): Unit = { |
| registerTableSinkInternal(name, fieldNames, fieldTypes, tableSink, true) |
| } |
| |
| /** |
| * Registers or replace an external [[TableSink]] with given field names and types in this |
| * [[TableEnvironment]]'s catalog. |
| * Registered sink tables can be referenced in SQL DML statements. |
| * |
| * @param name The name under which the [[TableSink]] is registered. |
| * @param fieldNames The field names to register with the [[TableSink]]. |
| * @param fieldTypes The field types to register with the [[TableSink]]. |
| * @param tableSink The [[TableSink]] to register. |
| * @param replace Whether replace this [[TableSink]]. |
| */ |
| protected def registerTableSinkInternal( |
| name: String, |
| fieldNames: Array[String], |
| fieldTypes: Array[DataType], |
| tableSink: TableSink[_], |
| replace: Boolean): Unit |
| |
| /** |
| * Registers an external [[TableSink]] with already configured field names and field types in |
| * this [[TableEnvironment]]'s catalog. |
| * Registered sink tables can be referenced in SQL DML statements. |
| * |
| * @param name The name under which the [[TableSink]] is registered. |
| * @param configuredSink The configured [[TableSink]] to register. |
| */ |
| def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = { |
| registerTableSinkInternal(name, configuredSink, false) |
| } |
| |
| /** |
| * Registers or replace an external [[TableSink]] with already configured field names and |
| * field types in this [[TableEnvironment]]'s catalog. |
| * Registered sink tables can be referenced in SQL DML statements. |
| * |
| * @param name The name under which the [[TableSink]] is registered. |
| * @param configuredSink The configured [[TableSink]] to register. |
| */ |
| def registerOrReplaceTableSink(name: String, configuredSink: TableSink[_]): Unit = { |
| registerTableSinkInternal(name, configuredSink, true) |
| } |
| |
| /** |
| * Registers an external [[TableSink]] with already configured field names and field types in |
| * this [[TableEnvironment]]'s catalog. |
| * Registered sink tables can be referenced in SQL DML statements. |
| * |
| * @param name The name under which the [[TableSink]] is registered. |
| * @param configuredSink The configured [[TableSink]] to register. |
| */ |
| protected def registerTableSinkInternal(name: String, |
| configuredSink: TableSink[_], |
| replace: Boolean): Unit |
| |
| |
| private[flink] def getStateTableNameForWrite(name: String): String = { |
| s"__W_$name" |
| } |
| |
| private[flink] def collect[T]( |
| table: Table, |
| sink: CollectTableSink[T], |
| jobName: Option[String]): Seq[T] = { |
| throw new TableException(s"collect is not supported.") |
| } |
| |
| /** |
| * Scans a registered table and returns the resulting [[Table]]. |
| * |
| * A table to scan must be registered in the catalog. It can be either directly |
| * registered as DataStream, DataSet, or Table or as member of an [[ReadableCatalog]]. |
| * |
| * Examples: |
| * |
| * - Scanning a directly registered table |
| * {{{ |
| * val tab: Table = tableEnv.scan("tableName") |
| * }}} |
| * |
| * - Scanning a table from a registered catalog |
| * {{{ |
| * val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName") |
| * }}} |
| * |
| * @param tablePath The path of the table to scan. |
| * @throws TableException if no table is found using the given table path. |
| * @return The resulting [[Table]]. |
| */ |
| @throws[TableException] |
| @varargs |
| def scan(tablePath: String*): Table = { |
| scanInternal(catalogManager.resolveTableName(tablePath.toArray : _*)) match { |
| case Some(table) => table |
| case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.") |
| } |
| } |
| |
| private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = { |
| val tableOpt = getTable(tablePath) |
| if (tableOpt.nonEmpty) { |
| Some(new Table(this, CatalogNode(tablePath, tableOpt.get.getRowType(typeFactory)))) |
| } else { |
| None |
| } |
| } |
| |
| /** |
| * Creates a table source and/or table sink from a descriptor. |
| * |
| * Descriptors allow for declaring the communication to external systems in an |
| * implementation-agnostic way. The classpath is scanned for suitable table factories that match |
| * the desired configuration. |
| * |
| * The following example shows how to read from a connector using a JSON format and |
| * registering a table source as "MyTable": |
| * |
| * {{{ |
| * |
| * tableEnv |
| * .connect( |
| * new ExternalSystemXYZ() |
| * .version("0.11")) |
| * .withFormat( |
| * new Json() |
| * .jsonSchema("{...}") |
| * .failOnMissingField(false)) |
| * .withSchema( |
| * new Schema() |
| * .field("user-name", "VARCHAR").from("u_name") |
| * .field("count", "DECIMAL") |
| * .registerSource("MyTable") |
| * }}} |
| * |
| * @param connectorDescriptor connector descriptor describing the external system |
| */ |
| def connect(connectorDescriptor: ConnectorDescriptor): TableDescriptor |
| |
| private def getSchema(schemaPath: Array[String]): SchemaPlus = { |
| var schema = currentSchema |
| for (schemaName <- schemaPath) { |
| schema = schema.getSubSchema(schemaName) |
| if (schema == null) { |
| return schema |
| } |
| } |
| schema |
| } |
| |
| /** |
| * Gets the names of all catalogs registered in this environment. |
| * |
| * @return A list of the names of all registered catalogs. |
| */ |
| def listCatalogs(): Array[String] = { |
| catalogManager.getCatalogs.asScala.toArray |
| } |
| |
| /** |
| * Gets the names of all databases registered in the default catalog. |
| * |
| * @return A list of the names of all registered databases. |
| */ |
| def listDatabases(): Array[String] = { |
| catalogManager.getDefaultCatalog.listDatabases().asScala.toArray |
| } |
| |
| /** |
| * Gets the names of all tables registered in the default database. |
| * |
| * @return A list of the names of all registered tables. |
| */ |
| def listTables(): Array[String] = { |
| catalogManager.getDefaultCatalog.listTables(catalogManager.getDefaultDatabaseName) |
| .map(op => op.getObjectName) |
| .toArray |
| } |
| |
| /** |
| * Gets the names of all functions registered in this environment. |
| */ |
| def listUserDefinedFunctions(): Array[String] = { |
| chainedFunctionCatalog.getSqlOperatorTable.getOperatorList.map(e => e.getName).toArray |
| } |
| |
| /** |
| * Returns the AST of the specified Table API and SQL queries and the execution plan to compute |
| * the result of the given [[Table]]. |
| * |
| * @param table The table for which the AST and execution plan will be returned. |
| */ |
| def explain(table: Table): String |
| |
| /** |
| * Explain the whole plan only when subsection optimization is supported, and returns the AST |
| * of the specified Table API and SQL queries and the execution plan. |
| * |
| * @param extended Flag to include detailed optimizer estimates. |
| */ |
| def explain(extended: Boolean = false): String |
| |
| /** |
| * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. |
| * |
| * All tables referenced by the query must be registered in the TableEnvironment. |
| * A [[Table]] is automatically registered when its [[toString]] method is called, for example |
| * when it is embedded into a String. |
| * Hence, SQL queries can directly reference a [[Table]] as follows: |
| * |
| * {{{ |
| * val table: Table = ... |
| * // the table is not registered to the table environment |
| * tEnv.sqlQuery(s"SELECT * FROM $table") |
| * }}} |
| * |
| * @param query The SQL query to evaluate. |
| * @return The result of the query as Table |
| */ |
| def sqlQuery(query: String): Table = { |
| // parse the sql query |
| val parsed = flinkPlanner.parse(query) |
| if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { |
| // validate the sql query |
| val validated = flinkPlanner.validate(parsed) |
| // transform to a relational tree |
| val relational = flinkPlanner.rel(validated) |
| new Table(this, LogicalRelNode(relational.project())) |
| } else { |
| throw new TableException( |
| "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " + |
| "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.") |
| } |
| } |
| |
| /** |
| * Returns specific FlinkCostFactory of TableEnvironment's subclass. |
| */ |
| protected def getFlinkCostFactory: FlinkCostFactory |
| |
| /** |
| * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement; |
| * NOTE: Currently only SQL INSERT statements are supported. |
| * |
| * All tables referenced by the query must be registered in the TableEnvironment. |
| * A [[Table]] is automatically registered when its [[toString]] method is called, for example |
| * when it is embedded into a String. |
| * Hence, SQL queries can directly reference a [[Table]] as follows: |
| * |
| * {{{ |
| * // register the table sink into which the result is inserted. |
| * tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink) |
| * val sourceTable: Table = ... |
| * // sourceTable is not registered to the table environment |
| * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable") |
| * }}} |
| * |
| * @param stmt The SQL statement to evaluate. |
| */ |
| def sqlUpdate(stmt: String): Unit = { |
| // parse the sql query |
| val parsed = flinkPlanner.parse(stmt) |
| parsed match { |
| case insert: SqlInsert => |
| if (insert.getTargetTable.isInstanceOf[SqlIdentifier] && |
| insert.getTargetTable.asInstanceOf[SqlIdentifier].toString.equals("console") && |
| getTable("console").isEmpty) { |
| val source = flinkPlanner.validate(insert.getSource) |
| val queryResult = new Table(this, LogicalRelNode(flinkPlanner.rel(source).rel)) |
| val schema = queryResult.getSchema |
| val printTableSink = new PrintTableSink(getConfig.getTimeZone).configure( |
| schema.getColumnNames, schema.getTypes.asInstanceOf[Array[DataType]]) |
| writeToSink(queryResult, printTableSink, "console") |
| return |
| } |
| // validate the insert sql |
| val validated = flinkPlanner.validate(insert) |
| // transform to a relational tree |
| val relational:LogicalTableModify = flinkPlanner.rel(validated).rel |
| .asInstanceOf[LogicalTableModify] |
| |
| // get query result as Table |
| val queryResult = new Table(this, LogicalRelNode(relational.getInput(0))) |
| |
| // get name of sink table |
| val targetTable = relational.getTable |
| |
| // set emit configs |
| val emit = insert.getEmit |
| if (emit != null && this.isInstanceOf[StreamTableEnvironment]) { |
| if (emit.getBeforeDelayValue >= 0) { |
| getConfig.withEarlyFireInterval(Time.milliseconds(emit.getBeforeDelayValue)) |
| } |
| if (emit.getAfterDelayValue >= 0) { |
| getConfig.withLateFireInterval(Time.milliseconds(emit.getAfterDelayValue)) |
| } |
| } |
| |
| // insert query result into sink table |
| insertInto( |
| queryResult, |
| targetTable.unwrap(classOf[schema.Table]), |
| StringUtils.join(targetTable.getQualifiedName, ",")) |
| case _ => |
| throw new TableException( |
| "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") |
| } |
| } |
| |
| /** |
| * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement; |
| * NOTE: Currently only SQL INSERT statements are supported. |
| * |
| * All tables referenced by the query must be registered in the TableEnvironment. |
| * A [[Table]] is automatically registered when its [[toString]] method is called, for example |
| * when it is embedded into a String. |
| * Hence, SQL queries can directly reference a [[Table]] as follows: |
| * |
| * {{{ |
| * // register the table sink into which the result is inserted. |
| * tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink) |
| * val sourceTable: Table = ... |
| * // sourceTable is not registered to the table environment |
| * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable") |
| * }}} |
| * |
| * @param stmt The SQL statement to evaluate. |
| * @param config The [[QueryConfig]] to use. |
| */ |
| def sqlUpdate(stmt: String, config: QueryConfig): Unit = { |
| config.overrideTableConfig(getConfig) |
| sqlUpdate(stmt) |
| } |
| |
| /** |
| * Writes a [[Table]] to a [[TableSink]]. |
| * |
| * @param table The [[Table]] to write. |
| * @param sink The [[TableSink]] to write the [[Table]] to. |
| * @tparam T The data type that the [[TableSink]] expects. |
| */ |
| private[table] def writeToSink[T]( |
| table: Table, |
| sink: TableSink[T], |
| sinkName: String = null): Unit |
| |
| /** |
| * Triggers the program execution. |
| */ |
| def execute(): JobExecutionResult = execute(DEFAULT_JOB_NAME) |
| |
| /** |
| * Triggers the program execution with jobName. |
| */ |
| def execute(jobName: String): JobExecutionResult |
| |
| /** |
| * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name. |
| * |
| * @param table The table to write to the TableSink. |
| * @param sinkTableName The name of the registered TableSink. |
| */ |
| private[flink] def insertInto(table: Table, sinkTableName: String): Unit = { |
| |
| // check that sink table exists |
| if (null == sinkTableName || sinkTableName.isEmpty) { |
| throw new TableException(TableErrors.INST.sqlInvalidSinkTblName()) |
| } |
| |
| if (!catalogManager.getDefaultCatalog() |
| .tableExists(new ObjectPath(catalogManager.getDefaultDatabaseName, sinkTableName))) { |
| throw new TableException(TableErrors.INST.sqlTableNotRegistered(sinkTableName)) |
| } |
| val targetTable = getTable(sinkTableName).get |
| |
| insertInto(table, targetTable, sinkTableName) |
| } |
| |
| private def insertInto( |
| sourceTable: Table, |
| targetTable: schema.Table, |
| targetTableName: String) = { |
| val tableSink = targetTable match { |
| case s: CatalogCalciteTable => s.tableSink |
| case s: TableSinkTable[_] => s.tableSink |
| case s: TableSourceSinkTable[_] if s.tableSinkTable.isDefined => |
| s.tableSinkTable.get.tableSink |
| case _ => |
| throw new TableException(TableErrors.INST.sqlNotTableSinkError(targetTableName)) |
| |
| } |
| |
| // validate schema of source table and table sink |
| val srcFieldTypes = sourceTable.getSchema.getTypes |
| val sinkFieldTypes = tableSink.getFieldTypes.map(_.toInternalType) |
| |
| val srcFieldNames = sourceTable.getSchema.getColumnNames |
| val sinkFieldNames = tableSink.getFieldNames |
| |
| val srcNameTypes = srcFieldNames.zip(srcFieldTypes) |
| val sinkNameTypes = sinkFieldNames.zip(sinkFieldTypes) |
| |
| def typeMatch(t1: InternalType, t2: InternalType): Boolean = { |
| t1 == t2 || |
| (t1.isInstanceOf[DateType] && t2.isInstanceOf[DateType]) || |
| (t1.isInstanceOf[TimestampType] && t2.isInstanceOf[TimestampType]) |
| |
| } |
| |
| if (srcFieldTypes.length != sinkFieldTypes.length) { |
| // format table and table sink schema strings |
| val srcSchema = srcNameTypes |
| .map { case (n, t) => s"$n: ${TypeUtils.getExternalClassForType(t)}" } |
| .mkString("[", ", ", "]") |
| |
| val sinkSchema = sinkNameTypes |
| .map { case (n, t) => s"$n: ${TypeUtils.getExternalClassForType(t)}" } |
| .mkString("[", ", ", "]") |
| |
| throw new ValidationException( |
| TableErrors.INST.sqlInsertIntoMismatchedFieldLen( |
| targetTableName, srcSchema, sinkSchema)) |
| } else if (srcFieldTypes.zip(sinkFieldTypes) |
| .exists { |
| case (_: GenericType[_], _: GenericType[_]) => false |
| case (srcF, snkF) => !typeMatch(srcF, snkF) |
| } |
| ) { |
| val diffNameTypes = srcNameTypes.zip(sinkNameTypes) |
| .filter { |
| case ((_, srcType), (_, sinkType)) => !typeMatch(srcType, sinkType) |
| } |
| val srcDiffMsg = diffNameTypes |
| .map(_._1) |
| .map { case (n, t) => s"$n: ${TypeUtils.getExternalClassForType(t)}" } |
| .mkString("[", ", ", "]") |
| val sinkDiffMsg = diffNameTypes |
| .map(_._2) |
| .map { case (n, t) => s"$n: ${TypeUtils.getExternalClassForType(t)}" } |
| .mkString("[", ", ", "]") |
| |
| throw new ValidationException( |
| TableErrors.INST.sqlInsertIntoMismatchedFieldTypes( |
| targetTableName, srcDiffMsg, sinkDiffMsg)) |
| } |
| |
| // emit the table to the configured table sink |
| writeToSink(sourceTable, tableSink, targetTableName) |
| } |
| |
| /** |
| * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog. |
| * |
| * @param name The name under which the table will be registered. |
| * @param table The table to register in the catalog. |
| * @param replace Whether to replace the registered table. |
| * @throws TableException if another table is registered under the provided name. |
| */ |
| @throws[TableException] |
| protected def registerTableInternal(name: String, |
| table: AbstractTable, |
| replace: Boolean): Unit = { |
| if (replace) { |
| catalogManager.getCatalog(catalogManager.getDefaultCatalogName) |
| .asInstanceOf[ReadableWritableCatalog] |
| .dropTable(new ObjectPath(catalogManager.getDefaultDatabaseName, name), true) |
| } |
| catalogManager.getCatalog(catalogManager.getDefaultCatalogName) |
| .asInstanceOf[ReadableWritableCatalog] |
| .createTable( |
| new ObjectPath(catalogManager.getDefaultDatabaseName, name), |
| createFlinkTempTable(table), |
| false |
| ) |
| } |
| |
| /** |
| * Checks if the chosen table name is valid. |
| * |
| * @param name The table name to check. |
| */ |
| protected def checkValidTableName(name: String): Unit = {} |
| |
| /** |
| * Close the table environment. This method will clean up the internal state and background |
| * services. Users should invoke this method if possible to avoid resource leak. |
| */ |
| def close(): Unit = { |
| if (closed.compareAndSet(false, true)) { |
| tableServiceManager.close() |
| } |
| } |
| |
| /** Returns a unique temporary attribute name. */ |
| private[flink] def createUniqueAttributeName(prefix: String): String = { |
| prefix + attrNameCntr.getAndIncrement() |
| } |
| |
| /** Returns a unique temporary attribute name. */ |
| private[flink] def createUniqueAttributeName(): String = { |
| "TMP_" + attrNameCntr.getAndIncrement() |
| } |
| |
| /** Returns a unique table name according to the internal naming pattern. */ |
| private[flink] def createUniqueTableName(): String = { |
| var res = tableNamePrefix + tableNameCntr.getAndIncrement() |
| while (getTable(res).nonEmpty) { |
| res = tableNamePrefix + tableNameCntr.getAndIncrement() |
| } |
| res |
| } |
| |
| /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */ |
| private[flink] def getRelBuilder: FlinkRelBuilder = { |
| relBuilder |
| } |
| |
| /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */ |
| private[flink] def getPlanner: RelOptPlanner = { |
| planner |
| } |
| |
| /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */ |
| private[flink] def getTypeFactory: FlinkTypeFactory = { |
| typeFactory |
| } |
| |
| /** Returns the chained [[FunctionCatalog]]. */ |
| private[flink] def getFunctionCatalog: FunctionCatalog = { |
| chainedFunctionCatalog |
| } |
| |
| private def createFrameworkConfig: FrameworkConfig = { |
| Frameworks |
| .newConfigBuilder |
| .defaultSchema(currentSchema) |
| .parserConfig(getSqlParserConfig) |
| .costFactory(getFlinkCostFactory) |
| .operatorTable(getSqlOperatorTable) |
| // set the executor to evaluate constant expressions |
| .executor(new ExpressionReducer(config)) |
| .context(FlinkChainContext.chain(Contexts.of(config), Contexts.of(chainedFunctionCatalog))) |
| .build |
| } |
| |
| /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */ |
| private[flink] def getFrameworkConfig: FrameworkConfig = { |
| frameworkConfig |
| } |
| |
| protected def createRelBuilder: FlinkRelBuilder = { |
| FlinkRelBuilder.create( |
| frameworkConfig, config, getTypeFactory, catalogManager = catalogManager) |
| } |
| |
| private def createRelOptPlanner: RelOptPlanner = { |
| relBuilder.getPlanner |
| } |
| |
| private def createFlinkPlanner: FlinkPlannerImpl = { |
| new FlinkPlannerImpl( |
| getFrameworkConfig, |
| getPlanner, |
| getTypeFactory, |
| sqlToRelConverterConfig, |
| relBuilder.getCluster, |
| catalogManager) |
| } |
| |
| /** |
| * Reference input fields by name: |
| * All fields in the schema definition are referenced by name |
| * (and possibly renamed using an alias (as). In this mode, fields can be reordered and |
| * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary |
| * positions using arbitrary names (except those that exist in the result schema). This mode |
| * can be used for any input type, including POJOs. |
| * |
| * Reference input fields by position: |
| * In this mode, fields are simply renamed. Event-time attributes can |
| * replace the field on their position in the input data (if it is of correct type) or be |
| * appended at the end. Proctime attributes must be appended at the end. This mode can only be |
| * used if the input type has a defined field order (tuple, case class, Row) and no of fields |
| * references a field of the input type. |
| */ |
| protected def isReferenceByPosition(ct: RowType, fields: Array[Expression]): Boolean = { |
| |
| val inputNames = ct.getFieldNames |
| |
| // Use the by-position mode if no of the fields exists in the input. |
| // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed |
| // by position but the user might assume reordering instead of renaming. |
| fields.forall { |
| case UnresolvedFieldReference(name) => !inputNames.contains(name) |
| case Alias(_, _, _) => false |
| case _ => true |
| } |
| } |
| |
| /** |
| * Returns field names and field positions for a given [[TypeInformation]]. |
| * |
| * @param inputType The DataType extract the field names and positions from. |
| * @return A tuple of two arrays holding the field names and corresponding field positions. |
| */ |
| protected[flink] def getFieldInfo(inputType: DataType): |
| (Array[String], Array[Int]) = { |
| |
| (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType)) |
| } |
| |
| /** |
| * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of |
| * [[Expression]]. It does not handle time attributes but considers them in indices. |
| * |
| * @param inputType The [[DataType]] against which the [[Expression]]s are evaluated. |
| * @param exprs The expressions that define the field names. |
| * @return A tuple of two arrays holding the field names and corresponding field positions. |
| */ |
| protected[flink] def getFieldInfo[A]( |
| inputType: DataType, |
| exprs: Array[Expression]) |
| : (Array[String], Array[Int]) = { |
| |
| TableEnvironment.validateType(inputType) |
| |
| def referenceByName(name: String, ct: RowType): Option[Int] = { |
| val inputIdx = ct.getFieldIndex(name) |
| if (inputIdx < 0) { |
| throw new TableException(s"$name is not a field of type $ct. " + |
| s"Expected: ${ct.getFieldNames.mkString(", ")}. " + |
| s"Make sure there is no field in physical data type referred " + |
| s"if you want to refer field by position.") |
| } else { |
| Some(inputIdx) |
| } |
| } |
| |
| val indexedNames: Array[(Int, String)] = inputType.toInternalType match { |
| |
| case t: RowType => |
| |
| val isRefByPos = isReferenceByPosition(t, exprs) |
| exprs.zipWithIndex flatMap { |
| case (UnresolvedFieldReference(name: String), idx) => |
| if (isRefByPos) { |
| Some((idx, name)) |
| } else { |
| referenceByName(name, t).map((_, name)) |
| } |
| case (Alias(UnresolvedFieldReference(origName), name: String, _), _) => |
| if (isRefByPos) { |
| throw new TableException( |
| s"Alias '$name' is not allowed if other fields are referenced by position.") |
| } else { |
| referenceByName(origName, t).map((_, name)) |
| } |
| case (_: TimeAttribute, _) => |
| None |
| case _ => throw new TableException( |
| "Field reference expression or alias on field expression expected.") |
| } |
| |
| case _: InternalType => // atomic or other custom type information |
| var referenced = false |
| exprs flatMap { |
| case _: TimeAttribute => |
| None |
| case UnresolvedFieldReference(_) if referenced => |
| // only accept the first field for an atomic type |
| throw new TableException("Only the first field can reference an atomic type.") |
| case UnresolvedFieldReference(name: String) => |
| referenced = true |
| // first field reference is mapped to atomic type |
| Some((0, name)) |
| case _ => throw new TableException( |
| "Field reference expression expected.") |
| } |
| } |
| |
| val (fieldIndexes, fieldNames) = indexedNames.unzip |
| |
| if (fieldNames.contains("*")) { |
| throw new TableException("Field name can not be '*'.") |
| } |
| |
| (fieldNames, fieldIndexes) |
| } |
| |
| def setUserClassLoader(userClassLoader: ClassLoader): Unit = { |
| this.userClassloader = userClassLoader |
| } |
| } |
| |
| /** |
| * Object to instantiate a [[TableEnvironment]] depending on the batch or stream execution |
| * environment. |
| */ |
| object TableEnvironment { |
| |
| /** |
| * The key for external catalog |
| */ |
| val DEFAULT_SCHEMA: String = "hive" |
| |
| /** |
| * Returns a [[BatchTableEnvironment]] for a Java [[JavaStreamExecEnv]]. |
| * |
| * @param executionEnvironment The Java batch ExecutionEnvironment. |
| */ |
| def getBatchTableEnvironment( |
| executionEnvironment: JavaStreamExecEnv): JavaBatchTableEnvironment = { |
| new JavaBatchTableEnvironment(executionEnvironment, new TableConfig()) |
| } |
| |
| /** |
| * Returns a [[BatchTableEnvironment]] for a Java [[JavaStreamExecEnv]] and a given |
| * [[TableConfig]]. |
| * |
| * @param executionEnvironment The Java batch ExecutionEnvironment. |
| * @param tableConfig The TableConfig for the new TableEnvironment. |
| */ |
| def getBatchTableEnvironment( |
| executionEnvironment: JavaStreamExecEnv, |
| tableConfig: TableConfig): JavaBatchTableEnvironment = { |
| new JavaBatchTableEnvironment(executionEnvironment, tableConfig) |
| } |
| |
| /** |
| * Returns a [[ScalaBatchTableEnvironment]] for a Scala stream [[ScalaStreamExecEnv]]. |
| * |
| * @param executionEnvironment The Scala StreamExecutionEnvironment. |
| */ |
| def getBatchTableEnvironment( |
| executionEnvironment: ScalaStreamExecEnv): ScalaBatchTableEnvironment = { |
| new ScalaBatchTableEnvironment(executionEnvironment, new TableConfig()) |
| } |
| |
| /** |
| * Returns a [[ScalaBatchTableEnvironment]] for a Scala stream [[ScalaStreamExecEnv]]. |
| * |
| * @param executionEnvironment The Scala StreamExecutionEnvironment. |
| * @param tableConfig The TableConfig for the new TableEnvironment. |
| */ |
| def getBatchTableEnvironment( |
| executionEnvironment: ScalaStreamExecEnv, |
| tableConfig: TableConfig): ScalaBatchTableEnvironment = { |
| |
| new ScalaBatchTableEnvironment(executionEnvironment, tableConfig) |
| } |
| |
| /** |
| * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]]. |
| * |
| * @param executionEnvironment The Java StreamExecutionEnvironment. |
| */ |
| def getTableEnvironment(executionEnvironment: JavaStreamExecEnv): JavaStreamTableEnv = { |
| new JavaStreamTableEnv(executionEnvironment, new TableConfig()) |
| } |
| |
| /** |
| * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]] and a given [[TableConfig]]. |
| * |
| * @param executionEnvironment The Java StreamExecutionEnvironment. |
| * @param tableConfig The TableConfig for the new TableEnvironment. |
| */ |
| def getTableEnvironment( |
| executionEnvironment: JavaStreamExecEnv, |
| tableConfig: TableConfig): JavaStreamTableEnv = { |
| |
| new JavaStreamTableEnv(executionEnvironment, tableConfig) |
| } |
| |
| /** |
| * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]]. |
| * |
| * @param executionEnvironment The Scala StreamExecutionEnvironment. |
| */ |
| def getTableEnvironment(executionEnvironment: ScalaStreamExecEnv): ScalaStreamTableEnv = { |
| new ScalaStreamTableEnv(executionEnvironment, new TableConfig()) |
| } |
| |
| /** |
| * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]]. |
| * |
| * @param executionEnvironment The Scala StreamExecutionEnvironment. |
| * @param tableConfig The TableConfig for the new TableEnvironment. |
| */ |
| def getTableEnvironment( |
| executionEnvironment: ScalaStreamExecEnv, |
| tableConfig: TableConfig): ScalaStreamTableEnv = { |
| |
| new ScalaStreamTableEnv(executionEnvironment, tableConfig) |
| } |
| |
| /** |
| * Validate if class represented by the typeInfo is static and globally accessible |
| * @param t type to check |
| * @throws TableException if type does not meet these criteria |
| */ |
| def validateType(t: DataType): Unit = { |
| val clazz = TypeUtils.getExternalClassForType(t) |
| if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || |
| !Modifier.isPublic(clazz.getModifiers) || |
| clazz.getCanonicalName == null) { |
| throw new TableException(s"Class '$clazz' described in type information '$t' must be " + |
| s"static and globally accessible.") |
| } |
| } |
| |
| /** |
| * Return rowType of tableSink. [[UpsertStreamTableSink]] and [[RetractStreamTableSink]] should |
| * return recordType, others return outputType. |
| * @param tableSink |
| * @tparam A |
| * @return |
| */ |
| def getRowTypeForTableSink[A](tableSink: TableSink[A]): DataType = { |
| tableSink match { |
| case u: UpsertStreamTableSink[A] => u.getRecordType |
| case r: RetractStreamTableSink[A] => r.getRecordType |
| case _ => tableSink.getOutputType |
| } |
| } |
| |
| /** |
| * Returns field names for a given [[TypeInformation]]. |
| * |
| * @param inputType The DataType extract the field names. |
| * @return An array holding the field names |
| */ |
| def getFieldNames(inputType: DataType): Array[String] = { |
| validateType(inputType) |
| |
| val fieldNames: Array[String] = inputType.toInternalType match { |
| case t: RowType => t.getFieldNames |
| case _: InternalType => Array("f0") |
| } |
| |
| if (fieldNames.contains("*")) { |
| throw new TableException("Field name can not be '*'.") |
| } |
| |
| fieldNames |
| } |
| |
| /** |
| * Returns field indexes for a given [[TypeInformation]]. |
| * |
| * @param inputType The DataType extract the field positions from. |
| * @return An array holding the field positions |
| */ |
| def getFieldIndices(inputType: DataType): Array[Int] = { |
| getFieldNames(inputType).indices.toArray |
| } |
| |
| /** |
| * Returns field types for a given [[TypeInformation]]. |
| * |
| * @param inputType The DataType to extract field types from. |
| * @return An array holding the field types. |
| */ |
| def getFieldTypes(inputType: DataType): Array[InternalType] = { |
| validateType(inputType) |
| |
| inputType.toInternalType match { |
| case ct: RowType => |
| 0.until(ct.getArity).map(i => ct.getInternalTypeAt(i).toInternalType).toArray |
| case t: InternalType => Array(t) |
| } |
| } |
| |
| } |