blob: cddf5e07042c454eefba676d4495f1d6ae292f71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api.types
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BOOLEAN_TYPE_INFO, BYTE_TYPE_INFO, CHAR_TYPE_INFO, DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO, INT_TYPE_INFO, LONG_TYPE_INFO, SHORT_TYPE_INFO, STRING_TYPE_INFO}
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, BigDecimalTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo, PojoField, PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.table.api.TableException
import org.apache.flink.table.typeutils.{BaseArrayTypeInfo, BaseMapTypeInfo, BaseRowTypeInfo, BinaryStringTypeInfo, DecimalTypeInfo, RowIntervalTypeInfo, TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
/**
* Type Converters:
* [[InternalType]] <=> [[TypeInformation]].
* [[DataType]] <=> [[TypeInformation]].
*/
object TypeConverters {
/**
* Create a [[InternalType]] from a [[TypeInformation]].
* If you want to convert to [[DataType]], just use [[TypeInfoWrappedDataType]] to wrap it.
*
* <p>Note: Information may be lost. For example, after Pojo is converted to InternalType,
* we no longer know that it is a Pojo and only think it is a Row.
*
* <p>Eg:
* [[BasicTypeInfo#STRING_TYPE_INFO]] => [[DataTypes.STRING]].
* [[BasicTypeInfo#BIG_DEC_TYPE_INFO]] => [[DecimalType]].
* [[RowTypeInfo]] => [[RowType]].
* [[PojoTypeInfo]] (CompositeType) => [[RowType]].
* [[TupleTypeInfo]] (CompositeType) => [[RowType]].
*/
def createInternalTypeFromTypeInfo(typeInfo: TypeInformation[_])
: InternalType = typeInfo match {
// built-in composite type info. (Need to be converted to RowType)
case rt: RowTypeInfo =>
new RowType(
rt.getFieldTypes.map(new TypeInfoWrappedDataType(_)).toArray[DataType],
rt.getFieldNames)
case tt: TupleTypeInfo[_] =>
new RowType(
(0 until tt.getArity).map(tt.getTypeAt)
.map(new TypeInfoWrappedDataType(_))
.toArray[DataType],
tt.getFieldNames)
case pt: PojoTypeInfo[_] =>
val fields = (0 until pt.getArity).map(pt.getPojoFieldAt)
new RowType(
fields.map{(field: PojoField) =>
new TypeInfoWrappedDataType(field.getTypeInformation)}.toArray[DataType],
fields.map{(field: PojoField) => field.getField.getName}.toArray)
case cs: CaseClassTypeInfo[_] => new RowType(
(0 until cs.getArity).map(cs.getTypeAt).map(new TypeInfoWrappedDataType(_)).toArray[DataType],
cs.fieldNames.toArray)
//primitive types
case BOOLEAN_TYPE_INFO => DataTypes.BOOLEAN
case BYTE_TYPE_INFO => DataTypes.BYTE
case SHORT_TYPE_INFO => DataTypes.SHORT
case INT_TYPE_INFO => DataTypes.INT
case LONG_TYPE_INFO => DataTypes.LONG
case FLOAT_TYPE_INFO => DataTypes.FLOAT
case DOUBLE_TYPE_INFO => DataTypes.DOUBLE
case CHAR_TYPE_INFO => DataTypes.CHAR
case STRING_TYPE_INFO | BinaryStringTypeInfo.INSTANCE => DataTypes.STRING
case dt: BigDecimalTypeInfo => DataTypes.createDecimalType(dt.precision, dt.scale)
case dt: DecimalTypeInfo => DataTypes.createDecimalType(dt.precision, dt.scale)
case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => DataTypes.BYTE_ARRAY
// temporal types
case TimeIntervalTypeInfo.INTERVAL_MONTHS => DataTypes.INTERVAL_MONTHS
case TimeIntervalTypeInfo.INTERVAL_MILLIS => DataTypes.INTERVAL_MILLIS
case RowIntervalTypeInfo.INTERVAL_ROWS => DataTypes.INTERVAL_ROWS
// time indicators
case SqlTimeTypeInfo.TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
val indicator = typeInfo.asInstanceOf[TimeIndicatorTypeInfo]
if (indicator.isEventTime) {
DataTypes.ROWTIME_INDICATOR
} else {
DataTypes.PROCTIME_INDICATOR
}
case SqlTimeTypeInfo.DATE => DataTypes.DATE
case SqlTimeTypeInfo.TIME => DataTypes.TIME
case SqlTimeTypeInfo.TIMESTAMP => DataTypes.TIMESTAMP
// arrays and map types
case pa: PrimitiveArrayTypeInfo[_] =>
DataTypes.createPrimitiveArrayType(new TypeInfoWrappedDataType(pa.getComponentType))
case pa: PrimitiveArrayTypeInfo[_] =>
DataTypes.createPrimitiveArrayType(new TypeInfoWrappedDataType(pa.getComponentType))
case ba: BasicArrayTypeInfo[_, _] =>
DataTypes.createArrayType(new TypeInfoWrappedDataType(ba.getComponentInfo))
case oa: ObjectArrayTypeInfo[_, _] =>
DataTypes.createArrayType(new TypeInfoWrappedDataType(oa.getComponentInfo))
case pa: BaseArrayTypeInfo =>
new ArrayType(pa.getEleType, pa.isPrimitive)
case mp: MultisetTypeInfo[_] =>
DataTypes.createMultisetType(new TypeInfoWrappedDataType(mp.getElementTypeInfo))
case mp: MapTypeInfo[_, _] =>
DataTypes.createMapType(
new TypeInfoWrappedDataType(mp.getKeyTypeInfo),
new TypeInfoWrappedDataType(mp.getValueTypeInfo))
case mp: BaseMapTypeInfo =>
DataTypes.createMapType(mp.getKeyType, mp.getValueType)
case br: BaseRowTypeInfo =>
new RowType(
br.getFieldTypes.map(new TypeInfoWrappedDataType(_)).toArray[DataType],
br.getFieldNames)
// unknown type info, treat as generic.
case _ => DataTypes.createGenericType(typeInfo)
}
/**
* Create a TypeInformation from DataType.
*
* <p>eg:
* [[DataTypes.STRING]] => [[BasicTypeInfo.STRING_TYPE_INFO]].
* [[RowType]] => [[RowTypeInfo]].
*/
def createExternalTypeInfoFromDataType(t: DataType): TypeInformation[_] = {
if (t == null) {
return null
}
t match {
//primitive types
case DataTypes.BOOLEAN => BOOLEAN_TYPE_INFO
case DataTypes.BYTE => BYTE_TYPE_INFO
case DataTypes.SHORT => SHORT_TYPE_INFO
case DataTypes.INT => INT_TYPE_INFO
case DataTypes.LONG => LONG_TYPE_INFO
case DataTypes.FLOAT => FLOAT_TYPE_INFO
case DataTypes.DOUBLE => DOUBLE_TYPE_INFO
case DataTypes.CHAR => CHAR_TYPE_INFO
case _: StringType => STRING_TYPE_INFO
case dt: DecimalType => BigDecimalTypeInfo.of(dt.precision, dt.scale);
case DataTypes.BYTE_ARRAY => BYTE_PRIMITIVE_ARRAY_TYPE_INFO
// temporal types
case DataTypes.INTERVAL_MONTHS => TimeIntervalTypeInfo.INTERVAL_MONTHS
case DataTypes.INTERVAL_MILLIS => TimeIntervalTypeInfo.INTERVAL_MILLIS
case DataTypes.ROWTIME_INDICATOR => TimeIndicatorTypeInfo.ROWTIME_INDICATOR
case DataTypes.PROCTIME_INDICATOR => TimeIndicatorTypeInfo.PROCTIME_INDICATOR
case DataTypes.INTERVAL_ROWS => RowIntervalTypeInfo.INTERVAL_ROWS
case DataTypes.DATE => SqlTimeTypeInfo.DATE
case DataTypes.TIME => SqlTimeTypeInfo.TIME
case DataTypes.TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
// arrays and map types
case at: ArrayType if at.isPrimitive => at.getElementInternalType match {
case DataTypes.BOOLEAN => PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
case DataTypes.SHORT => PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
case DataTypes.INT => PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
case DataTypes.LONG => PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
case DataTypes.FLOAT => PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
case DataTypes.DOUBLE => PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
case DataTypes.CHAR => PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO
}
case at: ArrayType => at.getElementInternalType match {
case DataTypes.BOOLEAN => BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO
case DataTypes.SHORT => BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO
case DataTypes.INT => BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
case DataTypes.LONG => BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO
case DataTypes.FLOAT => BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO
case DataTypes.DOUBLE => BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO
case DataTypes.CHAR => BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO
case DataTypes.STRING => BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
// object
case _ => ObjectArrayTypeInfo.getInfoFor(
createExternalTypeInfoFromDataType(at.getElementType))
}
case mp: MultisetType => new MultisetTypeInfo(
createExternalTypeInfoFromDataType(mp.getKeyType))
case mp: MapType =>
new MapTypeInfo(
createExternalTypeInfoFromDataType(mp.getKeyType),
createExternalTypeInfoFromDataType(mp.getValueType))
// composite types
case br: RowType =>
new RowTypeInfo(br.getFieldTypes.map(
createExternalTypeInfoFromDataType), br.getFieldNames)
case gt: GenericType[_] => gt.getTypeInfo
case et: TypeInfoWrappedDataType => et.getTypeInfo
case _ =>
throw new TableException(s"Type is not supported: $t")
}
}
/**
* Create a internal [[TypeInformation]] from a [[DataType]].
*
* <p>eg:
* [[DataTypes.STRING]] => [[BinaryStringTypeInfo]].
* [[RowType]] => [[BaseRowTypeInfo]].
*/
def createInternalTypeInfoFromDataType(t: DataType): TypeInformation[_] = {
if (t == null) {
return null
}
t match {
//primitive types
case DataTypes.BOOLEAN => BOOLEAN_TYPE_INFO
case DataTypes.BYTE => BYTE_TYPE_INFO
case DataTypes.SHORT => SHORT_TYPE_INFO
case DataTypes.INT => INT_TYPE_INFO
case DataTypes.LONG => LONG_TYPE_INFO
case DataTypes.FLOAT => FLOAT_TYPE_INFO
case DataTypes.DOUBLE => DOUBLE_TYPE_INFO
case DataTypes.CHAR => CHAR_TYPE_INFO
case _: StringType => BinaryStringTypeInfo.INSTANCE
case dt: DecimalType => DecimalTypeInfo.of(dt.precision, dt.scale);
case DataTypes.BYTE_ARRAY => BYTE_PRIMITIVE_ARRAY_TYPE_INFO
// temporal types
case DataTypes.INTERVAL_MONTHS => TimeIntervalTypeInfo.INTERVAL_MONTHS
case DataTypes.INTERVAL_MILLIS => TimeIntervalTypeInfo.INTERVAL_MILLIS
case DataTypes.ROWTIME_INDICATOR => TimeIndicatorTypeInfo.ROWTIME_INDICATOR
case DataTypes.PROCTIME_INDICATOR => TimeIndicatorTypeInfo.PROCTIME_INDICATOR
case DataTypes.INTERVAL_ROWS => RowIntervalTypeInfo.INTERVAL_ROWS
case DataTypes.DATE => INT_TYPE_INFO
case DataTypes.TIME => INT_TYPE_INFO
case DataTypes.TIMESTAMP => LONG_TYPE_INFO
// arrays and map types
case at: ArrayType => new BaseArrayTypeInfo(at.isPrimitive, at.getElementType)
case mp: MapType => new BaseMapTypeInfo(mp.getKeyType, mp.getValueType)
// composite types
case br: RowType => new BaseRowTypeInfo(
br.getFieldInternalTypes.map(createExternalTypeInfoFromDataType), br.getFieldNames)
case gt: GenericType[_] => gt.getTypeInfo
case et: TypeInfoWrappedDataType => createInternalTypeInfoFromDataType(et.toInternalType)
case _ =>
throw new TableException(s"Type is not supported: $t")
}
}
def createExternalTypeInfoFromDataTypes(types: Array[DataType]): Array[TypeInformation[_]] =
types.map(createExternalTypeInfoFromDataType)
def toBaseRowTypeInfo(t: RowType): BaseRowTypeInfo = {
new BaseRowTypeInfo(
t.getFieldInternalTypes.map(createExternalTypeInfoFromDataType),
t.getFieldNames)
}
}