blob: ce91a644f4f076ddc4920520bca000e0f6158c5c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeinfo;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BigIntComparator;
import org.apache.flink.api.common.typeutils.base.BigIntSerializer;
import org.apache.flink.api.common.typeutils.base.BooleanComparator;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.api.common.typeutils.base.ByteComparator;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.CharComparator;
import org.apache.flink.api.common.typeutils.base.CharSerializer;
import org.apache.flink.api.common.typeutils.base.DateComparator;
import org.apache.flink.api.common.typeutils.base.DateSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleComparator;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatComparator;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.ShortComparator;
import org.apache.flink.api.common.typeutils.base.ShortSerializer;
import org.apache.flink.api.common.typeutils.base.StringComparator;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import java.lang.reflect.Constructor;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Type information for primitive types (int, long, double, byte, ...), String, Date, Void,
* BigInteger, and BigDecimal.
*/
@Public
public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
private static final long serialVersionUID = -430955220409131770L;
public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);
public static final BasicTypeInfo<Boolean> BOOLEAN_TYPE_INFO = new BasicTypeInfo<>(Boolean.class, new Class<?>[]{}, BooleanSerializer.INSTANCE, BooleanComparator.class);
public static final BasicTypeInfo<Byte> BYTE_TYPE_INFO = new IntegerTypeInfo<>(Byte.class, new Class<?>[]{Short.class, Integer.class, Long.class, Float.class, Double.class, Character.class}, ByteSerializer.INSTANCE, ByteComparator.class);
public static final BasicTypeInfo<Short> SHORT_TYPE_INFO = new IntegerTypeInfo<>(Short.class, new Class<?>[]{Integer.class, Long.class, Float.class, Double.class, Character.class}, ShortSerializer.INSTANCE, ShortComparator.class);
public static final BasicTypeInfo<Integer> INT_TYPE_INFO = new IntegerTypeInfo<>(Integer.class, new Class<?>[]{Long.class, Float.class, Double.class, Character.class}, IntSerializer.INSTANCE, IntComparator.class);
public static final BasicTypeInfo<Long> LONG_TYPE_INFO = new IntegerTypeInfo<>(Long.class, new Class<?>[]{Float.class, Double.class, Character.class}, LongSerializer.INSTANCE, LongComparator.class);
public static final BasicTypeInfo<Float> FLOAT_TYPE_INFO = new FractionalTypeInfo<>(Float.class, new Class<?>[]{Double.class}, FloatSerializer.INSTANCE, FloatComparator.class);
public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new FractionalTypeInfo<>(Double.class, new Class<?>[]{}, DoubleSerializer.INSTANCE, DoubleComparator.class);
public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<>(Character.class, new Class<?>[]{}, CharSerializer.INSTANCE, CharComparator.class);
public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<>(Date.class, new Class<?>[]{}, DateSerializer.INSTANCE, DateComparator.class);
public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<>(Void.class, new Class<?>[]{}, VoidSerializer.INSTANCE, null);
public static final BasicTypeInfo<BigInteger> BIG_INT_TYPE_INFO = new BasicTypeInfo<>(BigInteger.class, new Class<?>[]{}, BigIntSerializer.INSTANCE, BigIntComparator.class);
public static final BasicTypeInfo<BigDecimal> BIG_DEC_TYPE_INFO = new BigDecimalTypeInfo(38, 18);
// --------------------------------------------------------------------------------------------
private final Class<T> clazz;
private final TypeSerializer<T> serializer;
private final Class<?>[] possibleCastTargetTypes;
private final Class<? extends TypeComparator<T>> comparatorClass;
protected BasicTypeInfo(Class<T> clazz, Class<?>[] possibleCastTargetTypes, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
this.clazz = checkNotNull(clazz);
this.possibleCastTargetTypes = checkNotNull(possibleCastTargetTypes);
this.serializer = checkNotNull(serializer);
// comparator can be null as in VOID_TYPE_INFO
this.comparatorClass = comparatorClass;
}
// --------------------------------------------------------------------------------------------
/**
* Returns whether this type should be automatically casted to
* the target type in an arithmetic operation.
*/
@PublicEvolving
public boolean shouldAutocastTo(BasicTypeInfo<?> to) {
for (Class<?> possibleTo: possibleCastTargetTypes) {
if (possibleTo.equals(to.getTypeClass())) {
return true;
}
}
return false;
}
@Override
@PublicEvolving
public boolean isBasicType() {
return true;
}
@Override
@PublicEvolving
public boolean isTupleType() {
return false;
}
@Override
@PublicEvolving
public int getArity() {
return 1;
}
@Override
@PublicEvolving
public int getTotalFields() {
return 1;
}
@Override
@PublicEvolving
public Class<T> getTypeClass() {
return this.clazz;
}
@Override
@PublicEvolving
public boolean isKeyType() {
return true;
}
@Override
@PublicEvolving
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return this.serializer;
}
@Override
@PublicEvolving
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
if (comparatorClass != null) {
return instantiateComparator(comparatorClass, sortOrderAscending);
} else {
throw new InvalidTypesException("The type " + clazz.getSimpleName() + " cannot be used as a key.");
}
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return (31 * Objects.hash(clazz, serializer, comparatorClass)) + Arrays.hashCode(possibleCastTargetTypes);
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof BasicTypeInfo;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof BasicTypeInfo) {
@SuppressWarnings("unchecked")
BasicTypeInfo<T> other = (BasicTypeInfo<T>) obj;
return other.canEqual(this) &&
this.clazz == other.clazz &&
serializer.equals(other.serializer) &&
this.comparatorClass == other.comparatorClass;
} else {
return false;
}
}
@Override
public String toString() {
return clazz.getSimpleName();
}
// --------------------------------------------------------------------------------------------
@PublicEvolving
public static <X> BasicTypeInfo<X> getInfoFor(Class<X> type) {
if (type == null) {
throw new NullPointerException();
}
@SuppressWarnings("unchecked")
BasicTypeInfo<X> info = (BasicTypeInfo<X>) TYPES.get(type);
return info;
}
private static <X> TypeComparator<X> instantiateComparator(Class<? extends TypeComparator<X>> comparatorClass, boolean ascendingOrder) {
try {
Constructor<? extends TypeComparator<X>> constructor = comparatorClass.getConstructor(boolean.class);
return constructor.newInstance(ascendingOrder);
}
catch (Exception e) {
throw new RuntimeException("Could not initialize basic comparator " + comparatorClass.getName(), e);
}
}
private static final Map<Class<?>, BasicTypeInfo<?>> TYPES = new HashMap<Class<?>, BasicTypeInfo<?>>();
static {
TYPES.put(String.class, STRING_TYPE_INFO);
TYPES.put(Boolean.class, BOOLEAN_TYPE_INFO);
TYPES.put(boolean.class, BOOLEAN_TYPE_INFO);
TYPES.put(Byte.class, BYTE_TYPE_INFO);
TYPES.put(byte.class, BYTE_TYPE_INFO);
TYPES.put(Short.class, SHORT_TYPE_INFO);
TYPES.put(short.class, SHORT_TYPE_INFO);
TYPES.put(Integer.class, INT_TYPE_INFO);
TYPES.put(int.class, INT_TYPE_INFO);
TYPES.put(Long.class, LONG_TYPE_INFO);
TYPES.put(long.class, LONG_TYPE_INFO);
TYPES.put(Float.class, FLOAT_TYPE_INFO);
TYPES.put(float.class, FLOAT_TYPE_INFO);
TYPES.put(Double.class, DOUBLE_TYPE_INFO);
TYPES.put(double.class, DOUBLE_TYPE_INFO);
TYPES.put(Character.class, CHAR_TYPE_INFO);
TYPES.put(char.class, CHAR_TYPE_INFO);
TYPES.put(Date.class, DATE_TYPE_INFO);
TYPES.put(Void.class, VOID_TYPE_INFO);
TYPES.put(void.class, VOID_TYPE_INFO);
TYPES.put(BigInteger.class, BIG_INT_TYPE_INFO);
TYPES.put(BigDecimal.class, BIG_DEC_TYPE_INFO);
}
}