blob: ae92e8fc17bc4874ef0800be00800c7d5a0f4c05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api.scala
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{StreamQueryConfig, Table, TableConfig, TableEnvironment}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.asScalaStream
import org.apache.flink.table.api.functions.{AggregateFunction, TableFunction}
/**
* The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]].
*
* A TableEnvironment can be used to:
* - convert a [[DataStream]] to a [[Table]]
* - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
* - register a [[Table]] in the [[TableEnvironment]]'s catalog
* - scan a registered table to obtain a [[Table]]
* - specify a SQL query on registered tables to obtain a [[Table]]
* - convert a [[Table]] into a [[DataStream]]
* - explain the AST and execution plan of a [[Table]]
*
* @param execEnv The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
* @param config The configuration of the TableEnvironment.
*/
class StreamTableEnvironment(
execEnv: StreamExecutionEnvironment,
config: TableConfig)
extends org.apache.flink.table.api.StreamTableEnvironment(
execEnv.getWrappedStreamExecutionEnvironment,
config) {
/**
* Converts the given [[DataStream]] into a [[Table]].
*
* The field names of the [[Table]] are automatically derived from the type of the
* [[DataStream]].
*
* @param dataStream The [[DataStream]] to be converted.
* @tparam T The type of the [[DataStream]].
* @return The converted [[Table]].
*/
def fromDataStream[T](dataStream: DataStream[T]): Table = {
val name = createUniqueTableName()
registerDataStreamInternal(name, dataStream.javaStream, false)
scan(name)
}
/**
* Converts the given [[DataStream]] into a [[Table]] with specified field names.
*
* Example:
*
* {{{
* val stream: DataStream[(String, Long)] = ...
* val tab: Table = tableEnv.fromDataStream(stream, 'a, 'b)
* }}}
*
* @param dataStream The [[DataStream]] to be converted.
* @param fields The field names of the resulting [[Table]].
* @tparam T The type of the [[DataStream]].
* @return The converted [[Table]].
*/
def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
val name = createUniqueTableName()
registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, false)
scan(name)
}
/**
* Registers the given [[DataStream]] as table in the
* [[TableEnvironment]]'s catalog.
* Registered tables can be referenced in SQL queries.
*
* The field names of the [[Table]] are automatically derived
* from the type of the [[DataStream]].
*
* @param name The name under which the [[DataStream]] is registered in the catalog.
* @param dataStream The [[DataStream]] to register.
* @tparam T The type of the [[DataStream]] to register.
*/
def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
checkValidTableName(name)
registerDataStreamInternal(name, dataStream.javaStream, false)
}
/**
* Registers the given [[DataStream]] as table with specified field names in the
* [[TableEnvironment]]'s catalog.
* Registered tables can be referenced in SQL queries.
*
* Example:
*
* {{{
* val set: DataStream[(String, Long)] = ...
* tableEnv.registerDataStream("myTable", set, 'a, 'b)
* }}}
*
* @param name The name under which the [[DataStream]] is registered in the catalog.
* @param dataStream The [[DataStream]] to register.
* @param fields The field names of the registered table.
* @tparam T The type of the [[DataStream]] to register.
*/
def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit = {
checkValidTableName(name)
registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, false)
}
def registerOrReplaceDataStream[T](name: String,
dataStream: DataStream[T],
fields: Expression*): Unit = {
checkValidTableName(name)
registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, true)
}
/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation](table: Table): DataStream[T] = toAppendStream(table)
/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* NOTE: This method only supports conversion of append-only tables. In order to make this
* more explicit in the future, please use [[toAppendStream()]] instead.
* If add and retract messages are required, use [[toRetractStream()]].
*
* @param table The [[Table]] to convert.
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
@deprecated("This method only supports conversion of append-only tables. In order to make this" +
" more explicit in the future, please use toAppendStream() instead.")
def toDataStream[T: TypeInformation](
table: Table,
queryConfig: StreamQueryConfig): DataStream[T] = toAppendStream(table, queryConfig)
/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* @param table The [[Table]] to convert.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
val returnType = createTypeInformation[T]
asScalaStream(translateToDataStream[T](
table,
updatesAsRetraction = false,
withChangeFlag = false,
returnType))
}
/**
* Converts the given [[Table]] into an append [[DataStream]] of a specified type.
*
* The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
* by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
* types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* @param table The [[Table]] to convert.
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
def toAppendStream[T: TypeInformation](
table: Table,
queryConfig: StreamQueryConfig): DataStream[T] = {
queryConfig.overrideTableConfig(getConfig)
toAppendStream(table)
}
/**
* Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
* the second field holds the record of the specified type [[T]].
*
* A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
*
* @param table The [[Table]] to convert.
* @tparam T The type of the requested data type.
* @return The converted [[DataStream]].
*/
def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = {
val returnType = createTypeInformation[(Boolean, T)]
asScalaStream(translateToDataStream[(Boolean, T)](
table,
updatesAsRetraction = true,
withChangeFlag = true,
returnType))
}
/**
* Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
* the second field holds the record of the specified type [[T]].
*
* A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
*
* @param table The [[Table]] to convert.
* @param queryConfig The configuration of the query to generate.
* @tparam T The type of the requested data type.
* @return The converted [[DataStream]].
*/
def toRetractStream[T: TypeInformation](
table: Table,
queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
queryConfig.overrideTableConfig(getConfig)
toRetractStream(table)
}
/**
* Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
* Registered functions can be referenced in SQL queries.
*
* @param name The name under which the function is registered.
* @param tf The TableFunction to register
*/
def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
registerTableFunctionInternal[T](name, tf)
}
/**
* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog.
* Registered functions can be referenced in Table API and SQL queries.
*
* @param name The name under which the function is registered.
* @param f The AggregateFunction to register.
* @tparam T The type of the output value.
* @tparam ACC The type of aggregate accumulator.
*/
def registerFunction[T: TypeInformation, ACC: TypeInformation](
name: String,
f: AggregateFunction[T, ACC])
: Unit = {
registerAggregateFunctionInternal[T, ACC](name, f)
}
}