blob: 65ba6babb39d1510879059236a20b06459bbc57e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.typeutils
import org.apache.flink.table.api.types.{DataTypes, DecimalType, InternalType, TimestampType}
import org.apache.flink.table.typeutils.TypeCheckUtils._
/**
* Utilities for type conversions.
*/
object TypeCoercion {
val numericWideningPrecedence: IndexedSeq[InternalType] =
IndexedSeq(
DataTypes.BYTE,
DataTypes.SHORT,
DataTypes.INT,
DataTypes.LONG,
DataTypes.FLOAT,
DataTypes.DOUBLE)
def widerTypeOf(tp1: InternalType, tp2: InternalType): Option[InternalType] = {
(tp1, tp2) match {
case (ti1, ti2) if ti1 == ti2 => Some(ti1)
case (_, DataTypes.STRING) => Some(DataTypes.STRING)
case (DataTypes.STRING, _) => Some(DataTypes.STRING)
case (_, dt: DecimalType) => Some(dt)
case (dt: DecimalType, _) => Some(dt)
case (a, b) if isTimePoint(a) && isTimeInterval(b) => Some(a)
case (a, b) if isTimeInterval(a) && isTimePoint(b) => Some(b)
case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == tp1 || t == tp2)
Some(numericWideningPrecedence(higherIndex))
case _ => None
}
}
/**
* Test if we can do cast safely without lose of type.
*/
def canSafelyCast(from: InternalType, to: InternalType): Boolean = (from, to) match {
case (_, DataTypes.STRING) => true
case (a, _: DecimalType) if isNumeric(a) => true
case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
if (numericWideningPrecedence.indexOf(from) < numericWideningPrecedence.indexOf(to)) {
true
} else {
false
}
case _ => false
}
/**
* All the supported cast types in flink-table.
* Note: No distinction between explicit and implicit conversions
* Note: This is a subset of SqlTypeAssignmentRule
* Note: This may lose type during the cast.
*
*/
def canCast(from: InternalType, to: InternalType): Boolean = (from, to) match {
case (fromTp, toTp) if fromTp == toTp => true
case (_, DataTypes.STRING) => true
case (_, DataTypes.CHAR) => false // Character type not supported.
case (DataTypes.STRING, b) if isNumeric(b) => true
case (DataTypes.STRING, DataTypes.BOOLEAN) => true
case (DataTypes.STRING, _: DecimalType) => true
case (DataTypes.STRING, DataTypes.DATE) => true
case (DataTypes.STRING, DataTypes.TIME) => true
case (DataTypes.STRING, _: TimestampType) => true
case (DataTypes.BOOLEAN, b) if isNumeric(b) => true
case (DataTypes.BOOLEAN, _: DecimalType) => true
case (a, DataTypes.BOOLEAN) if isNumeric(a) => true
case (_: DecimalType, DataTypes.BOOLEAN) => true
case (a, b) if isNumeric(a) && isNumeric(b) => true
case (a, _: DecimalType) if isNumeric(a) => true
case (_: DecimalType, b) if isNumeric(b) => true
case (_: DecimalType, _: DecimalType) => true
case (DataTypes.INT, DataTypes.DATE) => true
case (DataTypes.INT, DataTypes.TIME) => true
case (DataTypes.BYTE, _: TimestampType) => true
case (DataTypes.SHORT, _: TimestampType) => true
case (DataTypes.INT, _: TimestampType) => true
case (DataTypes.LONG, _: TimestampType) => true
case (DataTypes.DOUBLE, _: TimestampType) => true
case (DataTypes.FLOAT, _: TimestampType) => true
case (DataTypes.INT, DataTypes.INTERVAL_MONTHS) => true
case (DataTypes.LONG, DataTypes.INTERVAL_MILLIS) => true
case (DataTypes.DATE, DataTypes.TIME) => false
case (DataTypes.TIME, DataTypes.DATE) => false
case (a, b) if isTimePoint(a) && isTimePoint(b) => true
case (DataTypes.DATE, DataTypes.INT) => true
case (DataTypes.TIME, DataTypes.INT) => true
case (_: TimestampType, DataTypes.BYTE) => true
case (_: TimestampType, DataTypes.INT) => true
case (_: TimestampType, DataTypes.SHORT) => true
case (_: TimestampType, DataTypes.LONG) => true
case (_: TimestampType, DataTypes.DOUBLE) => true
case (_: TimestampType, DataTypes.FLOAT) => true
case (DataTypes.INTERVAL_MONTHS, DataTypes.INT) => true
case (DataTypes.INTERVAL_MILLIS, DataTypes.LONG) => true
case _ => false
}
/**
* All the supported reinterpret types in flink-table.
*
*/
def canReinterpret(from: InternalType, to: InternalType): Boolean = (from, to) match {
case (fromTp, toTp) if fromTp == toTp => true
case (DataTypes.DATE, DataTypes.INT) => true
case (DataTypes.TIME, DataTypes.INT) => true
case (_: TimestampType, DataTypes.LONG) => true
case (DataTypes.INT, DataTypes.DATE) => true
case (DataTypes.INT, DataTypes.TIME) => true
case (DataTypes.LONG, _: TimestampType) => true
case (DataTypes.INT, DataTypes.INTERVAL_MONTHS) => true
case (DataTypes.LONG, DataTypes.INTERVAL_MILLIS) => true
case (DataTypes.INTERVAL_MONTHS, DataTypes.INT) => true
case (DataTypes.INTERVAL_MILLIS, DataTypes.LONG) => true
case (DataTypes.DATE, DataTypes.LONG) => true
case (DataTypes.TIME, DataTypes.LONG) => true
case (DataTypes.INTERVAL_MONTHS, DataTypes.LONG) => true
case _ => false
}
}