blob: 55997302b38d5f3cd670c57218ca8a0da96e4f3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.typeutils
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{ListTypeInfo, PojoTypeInfo}
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.types._
import org.apache.flink.table.dataformat.BinaryRow
import org.apache.flink.table.validate._
object TypeCheckUtils {
def isNumeric(dataType: InternalType): Boolean = dataType match {
case DataTypes.INT | DataTypes.BYTE | DataTypes.SHORT
| DataTypes.LONG | DataTypes.FLOAT | DataTypes.DOUBLE => true
case _: DecimalType => true
case _ => false
}
def isTemporal(dataType: InternalType): Boolean =
isTimePoint(dataType) || isTimeInterval(dataType)
def isTimePoint(dataType: InternalType): Boolean = dataType match {
case DataTypes.INTERVAL_MILLIS | DataTypes.INTERVAL_MONTHS => false
case _: TimeType | _: DateType | _: TimestampType => true
case _ => false
}
def isRowTime(dataType: InternalType): Boolean =
dataType == DataTypes.ROWTIME_INDICATOR
def isProcTime(dataType: InternalType): Boolean =
dataType == DataTypes.PROCTIME_INDICATOR
def isTimeInterval(dataType: InternalType): Boolean = dataType match {
case DataTypes.INTERVAL_MILLIS | DataTypes.INTERVAL_MONTHS => true
case _ => false
}
def isString(dataType: InternalType): Boolean = dataType == DataTypes.STRING
def isBinary(dataType: InternalType): Boolean = dataType == DataTypes.BYTE_ARRAY
def isBoolean(dataType: InternalType): Boolean = dataType == DataTypes.BOOLEAN
def isDecimal(dataType: InternalType): Boolean = dataType.isInstanceOf[DecimalType]
def isInteger(dataType: InternalType): Boolean = dataType == DataTypes.INT
def isIntegerFamily(dataType: InternalType): Boolean =
dataType == DataTypes.INT ||
dataType == DataTypes.BYTE ||
dataType == DataTypes.LONG ||
dataType == DataTypes.SHORT
/**
* Types that can be easily converted into a string without ambiguity.
*/
def isSimpleStringRepresentation(dataType: InternalType): Boolean =
isNumeric(dataType) || isString(dataType) || isTemporal(dataType) || isBoolean(dataType)
def isLong(dataType: InternalType): Boolean = dataType == DataTypes.LONG
def isIntervalMonths(dataType: InternalType): Boolean = dataType == DataTypes.INTERVAL_MONTHS
def isIntervalMillis(dataType: InternalType): Boolean = dataType == DataTypes.INTERVAL_MILLIS
def isArray(dataType: InternalType): Boolean = dataType.isInstanceOf[ArrayType]
def isMap(dataType: InternalType): Boolean = dataType.isInstanceOf[MapType]
def isList(dataType: InternalType): Boolean = dataType match {
case gt: GenericType[_] => gt.getTypeInfo.isInstanceOf[ListTypeInfo[_]]
case _ => false
}
def isIntegral(dataType: InternalType): Boolean = {
dataType match {
case DataTypes.BYTE
| DataTypes.SHORT
| DataTypes.INT
| DataTypes.LONG => true
case _ => false
}
}
def isComparable(dataType: InternalType): Boolean =
!dataType.isInstanceOf[GenericType[_]] &&
!dataType.isInstanceOf[MapType] &&
!dataType.isInstanceOf[RowType] &&
!isArray(dataType)
def assertNumericExpr(
dataType: InternalType,
caller: String)
: ValidationResult = dataType match {
case t if TypeCheckUtils.isNumeric(t) => ValidationSuccess
case _ =>
ValidationFailure(s"$caller requires numeric types, get $dataType here")
}
def assertIntegerFamilyExpr(
dataType: InternalType,
caller: String)
: ValidationResult = dataType match {
case t if TypeCheckUtils.isIntegral(t) => ValidationSuccess
case _ =>
ValidationFailure(s"$caller requires integer types but was '$dataType'.")
}
def assertOrderableExpr(dataType: InternalType, caller: String): ValidationResult = {
if (TypeConverters.createExternalTypeInfoFromDataType(dataType).isSortKeyType) {
ValidationSuccess
} else {
ValidationFailure(s"$caller requires orderable types, get $dataType here")
}
}
/**
* Checks whether a type implements own hashCode() and equals() methods for storing an instance
* in Flink's state or performing a keyBy operation.
*
* @param name name of the operation.
* @param t type information to be validated
*/
def validateEqualsHashCode(name: String, t: TypeInformation[_]): Unit = t match {
// make sure that a POJO class is a valid state type
case pt: PojoTypeInfo[_] =>
// we don't check the types recursively to give a chance of wrapping
// proper hashCode/equals methods around an immutable type
validateEqualsHashCode(name, pt.getClass)
// BinaryRow direct hash in bytes, no need to check field types.
case bt: BaseRowTypeInfo =>
// recursively check composite types
case ct: CompositeType[_] =>
validateEqualsHashCode(name, t.getTypeClass)
// we check recursively for entering Flink types such as tuples and rows
for (i <- 0 until ct.getArity) {
val subtype = ct.getTypeAt(i)
validateEqualsHashCode(name, subtype)
}
// check other type information only based on the type class
case _: TypeInformation[_] =>
validateEqualsHashCode(name, t.getTypeClass)
}
/**
* Checks whether a class implements own hashCode() and equals() methods for storing an instance
* in Flink's state or performing a keyBy operation.
*
* @param name name of the operation
* @param c class to be validated
*/
def validateEqualsHashCode(name: String, c: Class[_]): Unit = {
// skip primitives
if (!c.isPrimitive) {
// check the component type of arrays
if (c.isArray) {
validateEqualsHashCode(name, c.getComponentType)
}
// check type for methods
else {
if (c.getMethod("hashCode").getDeclaringClass eq classOf[Object]) {
throw new ValidationException(
s"Type '${c.getCanonicalName}' cannot be used in a $name operation because it " +
s"does not implement a proper hashCode() method.")
}
if (c.getMethod("equals", classOf[Object]).getDeclaringClass eq classOf[Object]) {
throw new ValidationException(
s"Type '${c.getCanonicalName}' cannot be used in a $name operation because it " +
s"does not implement a proper equals() method.")
}
}
}
}
}