blob: ddb3871ccac0570cda8d3791b3877b31711b82a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.calcite
import java.util
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`._
import org.apache.calcite.sql.SqlIntervalQualifier
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName, SqlTypeUtil}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, _}
import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo, RowTypeInfo, PojoField => _}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.types._
import org.apache.flink.table.api.{TableException, TableSchema}
import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
import org.apache.flink.table.dataformat.{BaseRow, Decimal}
import org.apache.flink.table.plan.schema._
import org.apache.flink.table.typeutils._
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
import scala.collection.mutable
/**
* Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
* and Calcite's [[RelDataType]].
*/
class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
// NOTE: for future data types it might be necessary to
// override more methods of RelDataTypeFactoryImpl
private val seenTypes = mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]()
def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => false
case _: TimeIndicatorTypeInfo => false
case _: BasicTypeInfo[_] => false
case _: SqlTimeTypeInfo[_] => false
case _: TimeIntervalTypeInfo[_] => false
case _ => true
}
def createTypeFromInternalType(
t: InternalType,
isNullable: Boolean)
: RelDataType = {
createTypeFromTypeInfo(t, isNullable)
}
def createTypeFromTypeInfo(
typeInfo: TypeInformation[_],
isNullable: Boolean)
: RelDataType = {
// we cannot use seenTypes for simple types,
// because time indicators and timestamps would be the same
val relType = if (!isAdvanced(typeInfo)) {
// simple types can be converted to SQL types and vice versa
val sqlType = typeInfoToSqlTypeName(typeInfo)
sqlType match {
case DECIMAL =>
val dec = typeInfo.asInstanceOf[BigDecimalTypeInfo]
createSqlType(DECIMAL, dec.precision, dec.scale)
case INTERVAL_YEAR_MONTH =>
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
case INTERVAL_DAY_SECOND =>
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
createRowtimeIndicatorType()
} else {
createProctimeIndicatorType()
}
case _ =>
createSqlType(sqlType)
}
} else {
// advanced types require specific RelDataType
// for storing the original TypeInformation
seenTypes.getOrElseUpdate((typeInfo, isNullable), createAdvancedType(typeInfo, isNullable))
}
createTypeWithNullability(relType, isNullable)
}
/**
* Creates a indicator type for processing-time, but with similar properties as SQL timestamp.
*/
def createProctimeIndicatorType(): RelDataType = {
val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
canonize(
new TimeIndicatorRelDataType(
getTypeSystem,
originalType.asInstanceOf[BasicSqlType],
isEventTime = false)
)
}
/**
* Creates a indicator type for event-time, but with similar properties as SQL timestamp.
*/
def createRowtimeIndicatorType(): RelDataType = {
val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
canonize(
new TimeIndicatorRelDataType(
getTypeSystem,
originalType.asInstanceOf[BasicSqlType],
isEventTime = true)
)
}
/**
* Creates types that create custom [[RelDataType]]s that wrap Flink's [[TypeInformation]].
*/
private def createAdvancedType(
typeInfo: TypeInformation[_],
isNullable: Boolean): RelDataType = {
val relType = typeInfo match {
case ct: CompositeType[_] =>
new CompositeRelDataType(ct, isNullable, this)
case pa: PrimitiveArrayTypeInfo[_] =>
new ArrayRelDataType(
pa,
createTypeFromTypeInfo(pa.getComponentType, isNullable = false),
isNullable)
case ba: BasicArrayTypeInfo[_, _] =>
new ArrayRelDataType(
ba,
createTypeFromTypeInfo(ba.getComponentInfo, isNullable = true),
isNullable)
case oa: ObjectArrayTypeInfo[_, _] =>
new ArrayRelDataType(
oa,
createTypeFromTypeInfo(oa.getComponentInfo, isNullable = true),
isNullable)
case mp: MapTypeInfo[_, _] =>
new MapRelDataType(
mp,
createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
isNullable)
case mts: MultisetTypeInfo[_] =>
new MultisetRelDataType(
mts,
createTypeFromTypeInfo(mts.getElementTypeInfo, isNullable = true),
isNullable
)
case ti: TypeInformation[_] =>
new GenericRelDataType(
ti,
isNullable,
getTypeSystem.asInstanceOf[FlinkTypeSystem])
}
canonize(relType)
}
/**
* Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
*
* @param fieldNames field names
* @param fieldTypes field types, every element is Flink's [[TypeInformation]]
* @return a struct type with the input fieldNames, input fieldTypes, and system fields
*/
def buildLogicalRowType(
fieldNames: Seq[String],
fieldTypes: Seq[TypeInformation[_]])
: RelDataType = {
buildLogicalRowType(
fieldNames,
fieldTypes,
fieldTypes.map(!FlinkTypeFactory.isTimeIndicatorType(_)))
}
/**
* Creates a struct type with the input fieldNames, input fieldTypes and input fieldNullables
* using FlinkTypeFactory
*
* @param fieldNames field names
* @param fieldTypes field types, every element is Flink's [[TypeInformation]]
* @param fieldNullables field nullable properties
* @return a struct type with the input fieldNames, input fieldTypes, and system fields
*/
def buildLogicalRowType(
fieldNames: Seq[String],
fieldTypes: Seq[TypeInformation[_]],
fieldNullables: Seq[Boolean])
: RelDataType = {
val logicalRowTypeBuilder = builder
val fields = fieldNames.zip(fieldTypes).zip(fieldNullables)
fields foreach {
case ((fieldName, fieldType), fieldNullable) =>
if (FlinkTypeFactory.isTimeIndicatorType(fieldType) && fieldNullable) {
throw new TableException(
s"$fieldName can not be nullable because it is TimeIndicatorType!")
}
logicalRowTypeBuilder.add(fieldName, createTypeFromTypeInfo(fieldType, fieldNullable))
}
logicalRowTypeBuilder.build
}
/**
* Created a struct type with the input table schema using FlinkTypeFactory
* @param tableSchema the table schema
* @return a struct type with the input fieldNames, input fieldTypes, and system fields
*/
def buildLogicalRowType(tableSchema: TableSchema, isStreaming: Boolean): RelDataType = {
buildRelDataType(
tableSchema.getFieldNames.toSeq,
tableSchema.getFieldTypes map {
case DataTypes.PROCTIME_INDICATOR if !isStreaming => DataTypes.TIMESTAMP
case DataTypes.ROWTIME_INDICATOR if !isStreaming => DataTypes.TIMESTAMP
case tpe: InternalType => tpe
},
tableSchema.getFieldNullables)
}
def buildRelDataType(
fieldNames: Seq[String],
fieldTypes: Seq[InternalType])
: RelDataType = {
buildRelDataType(
fieldNames,
fieldTypes,
fieldTypes.map(!FlinkTypeFactory.isTimeIndicatorType(_)))
}
def buildRelDataType(
fieldNames: Seq[String],
fieldTypes: Seq[InternalType],
fieldNullables: Seq[Boolean])
: RelDataType = {
val b = builder
val fields = fieldNames.zip(fieldTypes).zip(fieldNullables)
fields foreach {
case ((fieldName, fieldType), fieldNullable) =>
if (FlinkTypeFactory.isTimeIndicatorType(fieldType) && fieldNullable) {
throw new TableException(
s"$fieldName can not be nullable because it is TimeIndicatorType!")
}
b.add(fieldName, createTypeFromInternalType(fieldType, fieldNullable))
}
b.build
}
// ----------------------------------------------------------------------------------------------
override def getJavaClass(`type`: RelDataType): java.lang.reflect.Type = {
if (`type`.getSqlTypeName == FLOAT) {
if (`type`.isNullable) {
classOf[java.lang.Float]
} else {
java.lang.Float.TYPE
}
} else {
super.getJavaClass(`type`)
}
}
override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
// it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
// Calcite will limit the length of the VARCHAR type to 65536.
if (typeName == VARCHAR && precision < 0) {
createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
} else {
super.createSqlType(typeName, precision)
}
}
override def createSqlType(typeName: SqlTypeName): RelDataType = {
if (typeName == DECIMAL) {
// if we got here, the precision and scale are not specified, here we
// keep precision/scale in sync with our type system's default value,
// see DecimalType.USER_DEFAULT.
createSqlType(typeName, DecimalType.USER_DEFAULT.precision(),
DecimalType.USER_DEFAULT.scale())
} else {
super.createSqlType(typeName)
}
}
override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
val arrayType = FlinkTypeFactory.toInternalType(elementType) match {
case DataTypes.BOOLEAN => BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO
case DataTypes.SHORT => BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO
case DataTypes.INT => BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
case DataTypes.LONG => BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
case DataTypes.FLOAT => BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO
case DataTypes.DOUBLE => BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO
case DataTypes.CHAR => BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO
case DataTypes.STRING => BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
// object
case _ => ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType))
}
val relType = new ArrayRelDataType(
arrayType,
elementType,
isNullable = false)
canonize(relType)
}
override def createMapType(keyType: RelDataType, valueType: RelDataType): RelDataType = {
val relType = new MapRelDataType(
new MapTypeInfo(
FlinkTypeFactory.toTypeInfo(keyType),
FlinkTypeFactory.toTypeInfo(valueType)),
keyType,
valueType,
isNullable = false)
canonize(relType)
}
override def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
val relType = new MultisetRelDataType(
MultisetTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
elementType,
isNullable = false)
canonize(relType)
}
override def createTypeWithNullability(
relDataType: RelDataType,
isNullable: Boolean): RelDataType = {
// nullability change not necessary
if (relDataType.isNullable == isNullable) {
return canonize(relDataType)
}
// change nullability
val newType = relDataType match {
case composite: CompositeRelDataType =>
new CompositeRelDataType(composite.compositeType, isNullable, this)
case array: ArrayRelDataType =>
new ArrayRelDataType(array.typeInfo, array.getComponentType, isNullable)
case map: MapRelDataType =>
new MapRelDataType(map.typeInfo, map.keyType, map.valueType, isNullable)
case multiSet: MultisetRelDataType =>
new MultisetRelDataType(multiSet.typeInfo, multiSet.getComponentType, isNullable)
case generic: GenericRelDataType =>
new GenericRelDataType(generic.typeInfo, isNullable, typeSystem)
case timeIndicator: TimeIndicatorRelDataType =>
timeIndicator
case _ =>
super.createTypeWithNullability(relDataType, isNullable)
}
canonize(newType)
}
override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
val type0 = types.get(0)
if (type0.getSqlTypeName != null) {
val resultType = resolveAllIdenticalTypes(types)
if (resultType.isDefined) {
// result type for identical types
return resultType.get
}
}
// fall back to super
super.leastRestrictive(types)
}
private def resolveAllIdenticalTypes(types: util.List[RelDataType]): Option[RelDataType] = {
val allTypes = types.asScala
val head = allTypes.head
// check if all types are the same
if (allTypes.forall(_ == head)) {
// types are the same, check nullability
val nullable = allTypes
.exists(sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL)
// return type with nullability
Some(createTypeWithNullability(head, nullable))
} else {
// types are not all the same
if (allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)) {
// one of the type was ANY.
// we cannot generate a common type if it differs from other types.
throw new TableException("Generic ANY types must have a common type information.")
} else {
// cannot resolve a common type for different input types
None
}
}
}
// Calcite's default impl for division is apparently borrowed from T-SQL,
// but the details are a little different, e.g. when Decimal(34,0)/Decimal(10,0)
// To avoid confusion, follow the exact T-SQL behavior.
// Note that for (+-*), Calcite is also different from T-SQL;
// however, Calcite conforms to SQL2003 while T-SQL does not.
// therefore we keep Calcite's behavior on (+-*)
override def createDecimalQuotient(type1: RelDataType, type2: RelDataType): RelDataType = {
if (SqlTypeUtil.isExactNumeric(type1) && SqlTypeUtil.isExactNumeric(type2) &&
(SqlTypeUtil.isDecimal(type1) || SqlTypeUtil.isDecimal(type2))) {
val result = Decimal.inferDivisionType(
type1.getPrecision, type1.getScale,
type2.getPrecision, type2.getScale)
createSqlType(SqlTypeName.DECIMAL, result.precision, result.scale)
}
else {
null
}
}
}
object FlinkTypeFactory {
private[flink] def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName =
typeInfo match {
case BOOLEAN_TYPE_INFO => BOOLEAN
case BYTE_TYPE_INFO => TINYINT
case SHORT_TYPE_INFO => SMALLINT
case INT_TYPE_INFO => INTEGER
case LONG_TYPE_INFO => BIGINT
case FLOAT_TYPE_INFO => FLOAT
case DOUBLE_TYPE_INFO => DOUBLE
case STRING_TYPE_INFO => VARCHAR
case _: BigDecimalTypeInfo => DECIMAL
// temporal types
case SqlTimeTypeInfo.DATE => DATE
case SqlTimeTypeInfo.TIME => TIME
case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => VARBINARY
case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
throw new TableException("Character type is not supported.")
case _@t =>
throw new TableException(s"Type is not supported: $t")
}
/**
* Converts a Calcite logical record into a Flink type information.
*/
@deprecated("Use the RowSchema class instead because it handles both logical and physical rows.")
def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = {
// convert to type information
val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
FlinkTypeFactory.toTypeInfo(relDataType.getType)
}
// field names
val logicalFieldNames = logicalRowType.getFieldNames.asScala
new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
}
/**
* Converts a Calcite logical record into a Flink BaseRow information.
*/
def toInternalBaseRowTypeInfo(logicalRowType: RelDataType): BaseRowTypeInfo = {
// convert to type information
val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
FlinkTypeFactory.toTypeInfo(relDataType.getType)
}
// field names
val logicalFieldNames = logicalRowType.getFieldNames.asScala
new BaseRowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
}
def toInternalRowType(logicalRowType: RelDataType): RowType = {
// convert to type information
val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
FlinkTypeFactory.toInternalType(relDataType.getType)
}
// field names
val logicalFieldNames = logicalRowType.getFieldNames.asScala
new RowType(logicalFieldTypes.toArray[DataType], logicalFieldNames.toArray)
}
def toInternalFieldTypes(logicalRowType: RelDataType): Seq[InternalType] = {
logicalRowType.getFieldList.asScala map { relDataType =>
FlinkTypeFactory.toInternalType(relDataType.getType)
}
}
def newBaseRowTypeInfo(
types: Array[TypeInformation[_]]): BaseRowTypeInfo = {
new BaseRowTypeInfo(types: _*)
}
def isProctimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
case ti: TimeIndicatorRelDataType if !ti.isEventTime => true
case _ => false
}
def isProctimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
case ti: TimeIndicatorTypeInfo if !ti.isEventTime => true
case _ => false
}
def isProctimeIndicatorType(dataType: DataType): Boolean = {
isProctimeIndicatorType(TypeConverters.createExternalTypeInfoFromDataType(dataType))
}
def isRowtimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
case ti: TimeIndicatorRelDataType if ti.isEventTime => true
case _ => false
}
def isRowtimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
case ti: TimeIndicatorTypeInfo if ti.isEventTime => true
case _ => false
}
def isRowtimeIndicatorType(dataType: DataType): Boolean = {
isRowtimeIndicatorType(dataType)
}
def isTimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
case _: TimeIndicatorRelDataType => true
case _ => false
}
def isTimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
case _: TimeIndicatorTypeInfo => true
case _ => false
}
def isTimeIndicatorType(t: InternalType): Boolean = t match {
case DataTypes.ROWTIME_INDICATOR | DataTypes.PROCTIME_INDICATOR => true
case _ => false
}
def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
case BOOLEAN => BOOLEAN_TYPE_INFO
case TINYINT => BYTE_TYPE_INFO
case SMALLINT => SHORT_TYPE_INFO
case INTEGER => INT_TYPE_INFO
case BIGINT => LONG_TYPE_INFO
case FLOAT => FLOAT_TYPE_INFO
case DOUBLE => DOUBLE_TYPE_INFO
case VARCHAR | CHAR => STRING_TYPE_INFO
case DECIMAL => BigDecimalTypeInfo.of(relDataType.getPrecision, relDataType.getScale)
// time indicators
case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
if (indicator.isEventTime) {
TimeIndicatorTypeInfo.ROWTIME_INDICATOR
} else {
TimeIndicatorTypeInfo.PROCTIME_INDICATOR
}
// temporal types
case DATE => SqlTimeTypeInfo.DATE
case TIME => SqlTimeTypeInfo.TIME
case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MONTHS
case typeName if DAY_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MILLIS
case NULL =>
throw new TableException(
"Type NULL is not supported. Null values must have a supported type.")
// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
// are represented as integer
case SYMBOL => INT_TYPE_INFO
// extract encapsulated TypeInformation
case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
genericRelDataType.typeInfo
case ROW if relDataType.isInstanceOf[CompositeRelDataType] =>
val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType]
compositeRelDataType.compositeType
case ROW if relDataType.isInstanceOf[RelRecordType] =>
val relRecordType = relDataType.asInstanceOf[RelRecordType]
new BaseRowSchema(relRecordType).typeInfo()
// CURSOR for UDTF case, whose type info will never be used, just a placeholder
case CURSOR => new NothingTypeInfo
case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
arrayRelDataType.typeInfo
case MAP if relDataType.isInstanceOf[MapRelDataType] =>
val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
mapRelDataType.typeInfo
case VARBINARY | BINARY => PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
multisetRelDataType.typeInfo
case _@t =>
throw new TableException(s"Type is not supported: $t")
}
def toInternalType(relDataType: RelDataType): InternalType =
TypeConverters.createInternalTypeFromTypeInfo(FlinkTypeFactory.toTypeInfo(relDataType))
def toDataType(relDataType: RelDataType): DataType = FlinkTypeFactory.toTypeInfo(relDataType)
}