blob: 1f76dc8847492602d9fec31b2a21e120f4341cfc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
import org.apache.flink.types.Row;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getTypeHierarchy;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;
/**
* A utility for reflection analysis on classes, to determine the return type of implementations of transformation
* functions.
*/
@Public
public class TypeExtractor {
/*
* NOTE: Most methods of the TypeExtractor work with a so-called "typeHierarchy".
* The type hierarchy describes all types (Classes, ParameterizedTypes, TypeVariables etc. ) and intermediate
* types from a given type of a function or type (e.g. MyMapper, Tuple2) until a current type
* (depends on the method, e.g. MyPojoFieldType).
*
* Thus, it fully qualifies types until tuple/POJO field level.
*
* A typical typeHierarchy could look like:
*
* UDF: MyMapFunction.class
* top-level UDF: MyMapFunctionBase.class
* RichMapFunction: RichMapFunction.class
* MapFunction: MapFunction.class
* Function's OUT: Tuple1<MyPojo>
* user-defined POJO: MyPojo.class
* user-defined top-level POJO: MyPojoBase.class
* POJO field: Tuple1<String>
* Field type: String.class
*
*/
/** The name of the class representing Hadoop's writable */
private static final String HADOOP_WRITABLE_CLASS = "org.apache.hadoop.io.Writable";
private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = "org.apache.flink.api.java.typeutils.WritableTypeInfo";
private static final String AVRO_SPECIFIC_RECORD_BASE_CLASS = "org.apache.avro.specific.SpecificRecordBase";
private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
public static final int[] NO_INDEX = new int[] {};
protected TypeExtractor() {
// only create instances for special use cases
}
// --------------------------------------------------------------------------------------------
// TypeInfoFactory registry
// --------------------------------------------------------------------------------------------
private static Map<Type, Class<? extends TypeInfoFactory>> registeredTypeInfoFactories = new HashMap<>();
/**
* Registers a type information factory globally for a certain type. Every following type extraction
* operation will use the provided factory for this type. The factory will have highest precedence
* for this type. In a hierarchy of types the registered factory has higher precedence than annotations
* at the same level but lower precedence than factories defined down the hierarchy.
*
* @param t type for which a new factory is registered
* @param factory type information factory that will produce {@link TypeInformation}
*/
private static void registerFactory(Type t, Class<? extends TypeInfoFactory> factory) {
Preconditions.checkNotNull(t, "Type parameter must not be null.");
Preconditions.checkNotNull(factory, "Factory parameter must not be null.");
if (!TypeInfoFactory.class.isAssignableFrom(factory)) {
throw new IllegalArgumentException("Class is not a TypeInfoFactory.");
}
if (registeredTypeInfoFactories.containsKey(t)) {
throw new InvalidTypesException("A TypeInfoFactory for type '" + t + "' is already registered.");
}
registeredTypeInfoFactories.put(t, factory);
}
// --------------------------------------------------------------------------------------------
// Function specific methods
// --------------------------------------------------------------------------------------------
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) {
return getMapReturnTypes(mapInterface, inType, null, false);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) mapInterface,
MapFunction.class,
0,
1,
new int[]{0},
NO_INDEX,
inType,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
return getFlatMapReturnTypes(flatMapInterface, inType, null, false);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) flatMapInterface,
FlatMapFunction.class,
0,
1,
new int[]{0},
new int[]{1, 0},
inType,
functionName,
allowMissing);
}
/**
* @deprecated will be removed in a future version
*/
@PublicEvolving
@Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType)
{
return getFoldReturnTypes(foldInterface, inType, null, false);
}
/**
* @deprecated will be removed in a future version
*/
@PublicEvolving
@Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) foldInterface,
FoldFunction.class,
0,
1,
new int[]{1},
NO_INDEX,
inType,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN, ACC> TypeInformation<ACC> getAggregateFunctionAccumulatorType(
AggregateFunction<IN, ACC, ?> function,
TypeInformation<IN> inType,
String functionName,
boolean allowMissing)
{
return getUnaryOperatorReturnType(
function,
AggregateFunction.class,
0,
1,
new int[]{0},
NO_INDEX,
inType,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getAggregateFunctionReturnType(
AggregateFunction<IN, ?, OUT> function,
TypeInformation<IN> inType,
String functionName,
boolean allowMissing)
{
return getUnaryOperatorReturnType(
function,
AggregateFunction.class,
0,
2,
NO_INDEX,
NO_INDEX,
inType,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType) {
return getMapPartitionReturnTypes(mapPartitionInterface, inType, null, false);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) mapPartitionInterface,
MapPartitionFunction.class,
0,
1,
new int[]{0, 0},
new int[]{1, 0},
inType,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType) {
return getGroupReduceReturnTypes(groupReduceInterface, inType, null, false);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) groupReduceInterface,
GroupReduceFunction.class,
0,
1,
new int[]{0, 0},
new int[]{1, 0},
inType,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) {
return getGroupCombineReturnTypes(combineInterface, inType, null, false);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) combineInterface,
GroupCombineFunction.class,
0,
1,
new int[]{0, 0},
new int[]{1, 0},
inType,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
{
return getFlatJoinReturnTypes(joinInterface, in1Type, in2Type, null, false);
}
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
(Function) joinInterface,
FlatJoinFunction.class,
0,
1,
2,
new int[]{0},
new int[]{1},
new int[]{2, 0},
in1Type,
in2Type,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
{
return getJoinReturnTypes(joinInterface, in1Type, in2Type, null, false);
}
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
(Function) joinInterface,
JoinFunction.class,
0,
1,
2,
new int[]{0},
new int[]{1},
NO_INDEX,
in1Type,
in2Type,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
{
return getCoGroupReturnTypes(coGroupInterface, in1Type, in2Type, null, false);
}
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
(Function) coGroupInterface,
CoGroupFunction.class,
0,
1,
2,
new int[]{0, 0},
new int[]{1, 0},
new int[]{2, 0},
in1Type,
in2Type,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
{
return getCrossReturnTypes(crossInterface, in1Type, in2Type, null, false);
}
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
(Function) crossInterface,
CrossFunction.class,
0,
1,
2,
new int[]{0},
new int[]{1},
NO_INDEX,
in1Type,
in2Type,
functionName,
allowMissing);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> inType) {
return getKeySelectorTypes(selectorInterface, inType, null, false);
}
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface,
TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
(Function) selectorInterface,
KeySelector.class,
0,
1,
new int[]{0},
NO_INDEX,
inType,
functionName,
allowMissing);
}
@PublicEvolving
public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner) {
return getPartitionerTypes(partitioner, null, false);
}
@PublicEvolving
public static <T> TypeInformation<T> getPartitionerTypes(
Partitioner<T> partitioner,
String functionName,
boolean allowMissing) {
try {
final LambdaExecutable exec;
try {
exec = checkAndExtractLambda(partitioner);
} catch (TypeExtractionException e) {
throw new InvalidTypesException("Internal error occurred.", e);
}
if (exec != null) {
// check for lambda type erasure
validateLambdaGenericParameters(exec);
// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
// paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
final int paramLen = exec.getParameterTypes().length;
final Method sam = TypeExtractionUtils.getSingleAbstractMethod(Partitioner.class);
// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
final int baseParametersLen = sam.getParameterTypes().length;
final Type keyType = TypeExtractionUtils.extractTypeFromLambda(
exec,
new int[]{0},
paramLen,
baseParametersLen);
return new TypeExtractor().privateCreateTypeInfo(keyType, null, null);
} else {
return new TypeExtractor().privateCreateTypeInfo(
Partitioner.class,
partitioner.getClass(),
0,
null,
null);
}
} catch (InvalidTypesException e) {
if (allowMissing) {
return (TypeInformation<T>) new MissingTypeInfo(functionName != null ? functionName : partitioner.toString(), e);
} else {
throw e;
}
}
}
@SuppressWarnings("unchecked")
@PublicEvolving
public static <IN> TypeInformation<IN> getInputFormatTypes(InputFormat<IN, ?> inputFormatInterface) {
if (inputFormatInterface instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<IN>) inputFormatInterface).getProducedType();
}
return new TypeExtractor().privateCreateTypeInfo(InputFormat.class, inputFormatInterface.getClass(), 0, null, null);
}
// --------------------------------------------------------------------------------------------
// Generic extraction methods
// --------------------------------------------------------------------------------------------
/**
* Returns the unary operator's return type.
*
* <p><b>NOTE:</b> lambda type indices allow extraction of Type from lambdas. To extract input type <b>IN</b>
* from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices.
*
* <pre>
* <code>
* OUT apply(Map<String, List<IN>> value)
* </code>
* </pre>
*
* @param function Function to extract the return type from
* @param baseClass Base class of the function
* @param inputTypeArgumentIndex Index of input type in the class specification
* @param outputTypeArgumentIndex Index of output type in the class specification
* @param lambdaInputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
* @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
* @param inType Type of the input elements (In case of an iterable, it is the element type)
* @param functionName Function name
* @param allowMissing Can the type information be missing
* @param <IN> Input type
* @param <OUT> Output type
* @return TypeInformation of the return type of the function
*/
@SuppressWarnings("unchecked")
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
Function function,
Class<?> baseClass,
int inputTypeArgumentIndex,
int outputTypeArgumentIndex,
int[] lambdaInputTypeArgumentIndices,
int[] lambdaOutputTypeArgumentIndices,
TypeInformation<IN> inType,
String functionName,
boolean allowMissing) {
try {
final LambdaExecutable exec;
try {
exec = checkAndExtractLambda(function);
} catch (TypeExtractionException e) {
throw new InvalidTypesException("Internal error occurred.", e);
}
if (exec != null) {
Preconditions.checkArgument(
lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
"Indices for input type arguments within lambda not provided");
Preconditions.checkArgument(
lambdaOutputTypeArgumentIndices != null,
"Indices for output type arguments within lambda not provided");
// check for lambda type erasure
validateLambdaGenericParameters(exec);
// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
// paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
final int paramLen = exec.getParameterTypes().length;
final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
final int baseParametersLen = sam.getParameterTypes().length;
// executable references "this" implicitly
if (paramLen <= 0) {
// executable declaring class can also be a super class of the input type
// we only validate if the executable exists in input type
validateInputContainsExecutable(exec, inType);
}
else {
final Type input = TypeExtractionUtils.extractTypeFromLambda(
exec,
lambdaInputTypeArgumentIndices,
paramLen,
baseParametersLen);
validateInputType(input, inType);
}
if (function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
final Type output;
if (lambdaOutputTypeArgumentIndices.length > 0) {
output = TypeExtractionUtils.extractTypeFromLambda(
exec,
lambdaOutputTypeArgumentIndices,
paramLen,
baseParametersLen);
} else {
output = exec.getReturnType();
}
return new TypeExtractor().privateCreateTypeInfo(output, inType, null);
} else {
Preconditions.checkArgument(inputTypeArgumentIndex >= 0, "Input type argument index was not provided");
Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType);
if(function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, inType, null);
}
}
catch (InvalidTypesException e) {
if (allowMissing) {
return (TypeInformation<OUT>) new MissingTypeInfo(functionName != null ? functionName : function.toString(), e);
} else {
throw e;
}
}
}
/**
* Returns the binary operator's return type.
*
* <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN1</b>
* from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInput1TypeArgumentIndices.
*
* <pre>
* <code>
* OUT apply(Map<String, List<IN1>> value1, List<IN2> value2)
* </code>
* </pre>
*
* @param function Function to extract the return type from
* @param baseClass Base class of the function
* @param input1TypeArgumentIndex Index of first input type in the class specification
* @param input2TypeArgumentIndex Index of second input type in the class specification
* @param outputTypeArgumentIndex Index of output type in the class specification
* @param lambdaInput1TypeArgumentIndices Table of indices of the type argument specifying the first input type. See example.
* @param lambdaInput2TypeArgumentIndices Table of indices of the type argument specifying the second input type. See example.
* @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the output type. See example.
* @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
* @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
* @param functionName Function name
* @param allowMissing Can the type information be missing
* @param <IN1> Left side input type
* @param <IN2> Right side input type
* @param <OUT> Output type
* @return TypeInformation of the return type of the function
*/
@SuppressWarnings("unchecked")
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
Function function,
Class<?> baseClass,
int input1TypeArgumentIndex,
int input2TypeArgumentIndex,
int outputTypeArgumentIndex,
int[] lambdaInput1TypeArgumentIndices,
int[] lambdaInput2TypeArgumentIndices,
int[] lambdaOutputTypeArgumentIndices,
TypeInformation<IN1> in1Type,
TypeInformation<IN2> in2Type,
String functionName,
boolean allowMissing) {
try {
final LambdaExecutable exec;
try {
exec = checkAndExtractLambda(function);
} catch (TypeExtractionException e) {
throw new InvalidTypesException("Internal error occurred.", e);
}
if (exec != null) {
Preconditions.checkArgument(
lambdaInput1TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1,
"Indices for first input type arguments within lambda not provided");
Preconditions.checkArgument(
lambdaInput2TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1,
"Indices for second input type arguments within lambda not provided");
Preconditions.checkArgument(
lambdaOutputTypeArgumentIndices != null,
"Indices for output type arguments within lambda not provided");
// check for lambda type erasure
validateLambdaGenericParameters(exec);
final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
final int baseParametersLen = sam.getParameterTypes().length;
// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
final int paramLen = exec.getParameterTypes().length;
final Type input1 = TypeExtractionUtils.extractTypeFromLambda(
exec,
lambdaInput1TypeArgumentIndices,
paramLen,
baseParametersLen);
final Type input2 = TypeExtractionUtils.extractTypeFromLambda(
exec,
lambdaInput2TypeArgumentIndices,
paramLen,
baseParametersLen);
validateInputType(input1, in1Type);
validateInputType(input2, in2Type);
if(function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
final Type output;
if (lambdaOutputTypeArgumentIndices.length > 0) {
output = TypeExtractionUtils.extractTypeFromLambda(
exec,
lambdaOutputTypeArgumentIndices,
paramLen,
baseParametersLen);
} else {
output = exec.getReturnType();
}
return new TypeExtractor().privateCreateTypeInfo(
output,
in1Type,
in2Type);
}
else {
Preconditions.checkArgument(input1TypeArgumentIndex >= 0, "Input 1 type argument index was not provided");
Preconditions.checkArgument(input2TypeArgumentIndex >= 0, "Input 2 type argument index was not provided");
Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type);
validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type);
if(function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, in1Type, in2Type);
}
}
catch (InvalidTypesException e) {
if (allowMissing) {
return (TypeInformation<OUT>) new MissingTypeInfo(functionName != null ? functionName : function.toString(), e);
} else {
throw e;
}
}
}
// --------------------------------------------------------------------------------------------
// Create type information
// --------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
public static <T> TypeInformation<T> createTypeInfo(Class<T> type) {
return (TypeInformation<T>) createTypeInfo((Type) type);
}
public static TypeInformation<?> createTypeInfo(Type t) {
TypeInformation<?> ti = new TypeExtractor().privateCreateTypeInfo(t);
if (ti == null) {
throw new InvalidTypesException("Could not extract type information.");
}
return ti;
}
/**
* Creates a {@link TypeInformation} from the given parameters.
*
* If the given {@code instance} implements {@link ResultTypeQueryable}, its information
* is used to determine the type information. Otherwise, the type information is derived
* based on the given class information.
*
* @param instance instance to determine type information for
* @param baseClass base class of {@code instance}
* @param clazz class of {@code instance}
* @param returnParamPos index of the return type in the type arguments of {@code clazz}
* @param <OUT> output type
* @return type information
*/
@SuppressWarnings("unchecked")
@PublicEvolving
public static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz, int returnParamPos) {
if (instance instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) instance).getProducedType();
} else {
return createTypeInfo(baseClass, clazz, returnParamPos, null, null);
}
}
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
TypeInformation<OUT> ti = new TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);
if (ti == null) {
throw new InvalidTypesException("Could not extract type information.");
}
return ti;
}
// ----------------------------------- private methods ----------------------------------------
private TypeInformation<?> privateCreateTypeInfo(Type t) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
typeHierarchy.add(t);
return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null);
}
// for (Rich)Functions
@SuppressWarnings("unchecked")
private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos);
TypeInformation<OUT> typeInfo;
// return type is a variable -> try to get the type info from the input directly
if (returnType instanceof TypeVariable<?>) {
typeInfo = (TypeInformation<OUT>) createTypeInfoFromInputs((TypeVariable<?>) returnType, typeHierarchy, in1Type, in2Type);
if (typeInfo != null) {
return typeInfo;
}
}
// get info from hierarchy
return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
}
// for LambdaFunctions
@SuppressWarnings("unchecked")
private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
// get info from hierarchy
return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
// check if type information can be created using a type factory
final TypeInformation<OUT> typeFromFactory = createTypeInfoFromFactory(t, typeHierarchy, in1Type, in2Type);
if (typeFromFactory != null) {
return typeFromFactory;
}
// check if type is a subclass of tuple
else if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) {
Type curT = t;
// do not allow usage of Tuple as type
if (typeToClass(t).equals(Tuple.class)) {
throw new InvalidTypesException(
"Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.");
}
// go up the hierarchy until we reach immediate child of Tuple (with or without generics)
// collect the types while moving up for a later top-down
while (!(isClassType(curT) && typeToClass(curT).getSuperclass().equals(Tuple.class))) {
typeHierarchy.add(curT);
curT = typeToClass(curT).getGenericSuperclass();
}
if(curT == Tuple0.class) {
return new TupleTypeInfo(Tuple0.class);
}
// check if immediate child of Tuple has generics
if (curT instanceof Class<?>) {
throw new InvalidTypesException("Tuple needs to be parameterized by using generics.");
}
typeHierarchy.add(curT);
// create the type information for the subtypes
final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
// type needs to be treated a pojo due to additional fields
if (subTypesInfo == null) {
if (t instanceof ParameterizedType) {
return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
}
else {
return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
}
}
// return tuple info
return new TupleTypeInfo(typeToClass(t), subTypesInfo);
}
// type depends on another type
// e.g. class MyMapper<E> extends MapFunction<String, E>
else if (t instanceof TypeVariable) {
Type typeVar = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) t);
if (!(typeVar instanceof TypeVariable)) {
return createTypeInfoWithTypeHierarchy(typeHierarchy, typeVar, in1Type, in2Type);
}
// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
else {
TypeInformation<OUT> typeInfo = (TypeInformation<OUT>) createTypeInfoFromInputs((TypeVariable<?>) t, typeHierarchy, in1Type, in2Type);
if (typeInfo != null) {
return typeInfo;
} else {
throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) t).getName() + "' in '"
+ ((TypeVariable<?>) t).getGenericDeclaration() + "' could not be determined. This is most likely a type erasure problem. "
+ "The type extraction currently supports types with generic variables only in cases where "
+ "all variables in the return type can be deduced from the input type(s).");
}
}
}
// arrays with generics
else if (t instanceof GenericArrayType) {
GenericArrayType genericArray = (GenericArrayType) t;
Type componentType = genericArray.getGenericComponentType();
// due to a Java 6 bug, it is possible that the JVM classifies e.g. String[] or int[] as GenericArrayType instead of Class
if (componentType instanceof Class) {
Class<?> componentClass = (Class<?>) componentType;
Class<OUT> classArray = (Class<OUT>) (java.lang.reflect.Array.newInstance(componentClass, 0).getClass());
return getForClass(classArray);
} else {
TypeInformation<?> componentInfo = createTypeInfoWithTypeHierarchy(
typeHierarchy,
genericArray.getGenericComponentType(),
in1Type,
in2Type);
Class<?> componentClass = componentInfo.getTypeClass();
Class<OUT> classArray = (Class<OUT>) (java.lang.reflect.Array.newInstance(componentClass, 0).getClass());
return ObjectArrayTypeInfo.getInfoFor(classArray, componentInfo);
}
}
// objects with generics are treated as Class first
else if (t instanceof ParameterizedType) {
return (TypeInformation<OUT>) privateGetForClass(typeToClass(t), typeHierarchy, (ParameterizedType) t, in1Type, in2Type);
}
// no tuple, no TypeVariable, no generic type
else if (t instanceof Class) {
return privateGetForClass((Class<OUT>) t, typeHierarchy);
}
throw new InvalidTypesException("Type Information could not be created.");
}
private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy,
TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) {
Type matReturnTypeVar = materializeTypeVariable(returnTypeHierarchy, returnTypeVar);
// variable could be resolved
if (!(matReturnTypeVar instanceof TypeVariable)) {
return createTypeInfoWithTypeHierarchy(returnTypeHierarchy, matReturnTypeVar, in1TypeInfo, in2TypeInfo);
}
else {
returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
}
// no input information exists
if (in1TypeInfo == null && in2TypeInfo == null) {
return null;
}
// create a new type hierarchy for the input
ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
// copy the function part of the type hierarchy
for (Type t : returnTypeHierarchy) {
if (isClassType(t) && Function.class.isAssignableFrom(typeToClass(t)) && typeToClass(t) != Function.class) {
inputTypeHierarchy.add(t);
}
else {
break;
}
}
ParameterizedType baseClass = (ParameterizedType) inputTypeHierarchy.get(inputTypeHierarchy.size() - 1);
TypeInformation<?> info = null;
if (in1TypeInfo != null) {
// find the deepest type variable that describes the type of input 1
Type in1Type = baseClass.getActualTypeArguments()[0];
info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in1Type, in1TypeInfo);
}
if (info == null && in2TypeInfo != null) {
// find the deepest type variable that describes the type of input 2
Type in2Type = baseClass.getActualTypeArguments()[1];
info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in2Type, in2TypeInfo);
}
if (info != null) {
return info;
}
return null;
}
/**
* Finds the type information to a type variable.
*
* It solve the following:
*
* Return the type information for "returnTypeVar" given that "inType" has type information "inTypeInfo".
* Thus "inType" must contain "returnTypeVar" in a "inputTypeHierarchy", otherwise null is returned.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
TypeInformation<?> info = null;
// use a factory to find corresponding type information to type variable
final ArrayList<Type> factoryHierarchy = new ArrayList<>(inputTypeHierarchy);
final TypeInfoFactory<?> factory = getClosestFactory(factoryHierarchy, inType);
if (factory != null) {
// the type that defines the factory is last in factory hierarchy
final Type factoryDefiningType = factoryHierarchy.get(factoryHierarchy.size() - 1);
// defining type has generics, the factory need to be asked for a mapping of subtypes to type information
if (factoryDefiningType instanceof ParameterizedType) {
final Type[] typeParams = typeToClass(factoryDefiningType).getTypeParameters();
final Type[] actualParams = ((ParameterizedType) factoryDefiningType).getActualTypeArguments();
// go thru all elements and search for type variables
for (int i = 0; i < actualParams.length; i++) {
final Map<String, TypeInformation<?>> componentInfo = inTypeInfo.getGenericParameters();
final String typeParamName = typeParams[i].toString();
if (!componentInfo.containsKey(typeParamName) || componentInfo.get(typeParamName) == null) {
throw new InvalidTypesException("TypeInformation '" + inTypeInfo.getClass().getSimpleName() +
"' does not supply a mapping of TypeVariable '" + typeParamName + "' to corresponding TypeInformation. " +
"Input type inference can only produce a result with this information. " +
"Please implement method 'TypeInformation.getGenericParameters()' for this.");
}
info = createTypeInfoFromInput(returnTypeVar, factoryHierarchy, actualParams[i], componentInfo.get(typeParamName));
if (info != null) {
break;
}
}
}
}
// the input is a type variable
else if (sameTypeVars(inType, returnTypeVar)) {
return inTypeInfo;
}
else if (inType instanceof TypeVariable) {
Type resolvedInType = materializeTypeVariable(inputTypeHierarchy, (TypeVariable<?>) inType);
if (resolvedInType != inType) {
info = createTypeInfoFromInput(returnTypeVar, inputTypeHierarchy, resolvedInType, inTypeInfo);
}
}
// input is an array
else if (inType instanceof GenericArrayType) {
TypeInformation<?> componentInfo = null;
if (inTypeInfo instanceof BasicArrayTypeInfo) {
componentInfo = ((BasicArrayTypeInfo<?,?>) inTypeInfo).getComponentInfo();
}
else if (inTypeInfo instanceof PrimitiveArrayTypeInfo) {
componentInfo = BasicTypeInfo.getInfoFor(inTypeInfo.getTypeClass().getComponentType());
}
else if (inTypeInfo instanceof ObjectArrayTypeInfo) {
componentInfo = ((ObjectArrayTypeInfo<?,?>) inTypeInfo).getComponentInfo();
}
info = createTypeInfoFromInput(returnTypeVar, inputTypeHierarchy, ((GenericArrayType) inType).getGenericComponentType(), componentInfo);
}
// the input is a tuple
else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType) && Tuple.class.isAssignableFrom(typeToClass(inType))) {
ParameterizedType tupleBaseClass;
// get tuple from possible tuple subclass
while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) {
inputTypeHierarchy.add(inType);
inType = typeToClass(inType).getGenericSuperclass();
}
inputTypeHierarchy.add(inType);
// we can assume to be parameterized since we
// already did input validation
tupleBaseClass = (ParameterizedType) inType;
Type[] tupleElements = tupleBaseClass.getActualTypeArguments();
// go thru all tuple elements and search for type variables
for (int i = 0; i < tupleElements.length; i++) {
info = createTypeInfoFromInput(returnTypeVar, inputTypeHierarchy, tupleElements[i], ((TupleTypeInfo<?>) inTypeInfo).getTypeAt(i));
if(info != null) {
break;
}
}
}
// the input is a pojo
else if (inTypeInfo instanceof PojoTypeInfo && isClassType(inType)) {
// build the entire type hierarchy for the pojo
getTypeHierarchy(inputTypeHierarchy, inType, Object.class);
// determine a field containing the type variable
List<Field> fields = getAllDeclaredFields(typeToClass(inType), false);
for (Field field : fields) {
Type fieldType = field.getGenericType();
if (fieldType instanceof TypeVariable && sameTypeVars(returnTypeVar, materializeTypeVariable(inputTypeHierarchy, (TypeVariable<?>) fieldType))) {
return getTypeOfPojoField(inTypeInfo, field);
}
else if (fieldType instanceof ParameterizedType || fieldType instanceof GenericArrayType) {
ArrayList<Type> typeHierarchyWithFieldType = new ArrayList<>(inputTypeHierarchy);
typeHierarchyWithFieldType.add(fieldType);
TypeInformation<?> foundInfo = createTypeInfoFromInput(returnTypeVar, typeHierarchyWithFieldType, fieldType, getTypeOfPojoField(inTypeInfo, field));
if (foundInfo != null) {
return foundInfo;
}
}
}
}
return info;
}
/**
* Creates the TypeInformation for all elements of a type that expects a certain number of
* subtypes (e.g. TupleXX).
*
* @param originalType most concrete subclass
* @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes)
* @param typeHierarchy necessary for type inference
* @param in1Type necessary for type inference
* @param in2Type necessary for type inference
* @param lenient decides whether exceptions should be thrown if a subtype can not be determined
* @return array containing TypeInformation of sub types or null if definingType contains
* more subtypes (fields) that defined
*/
private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type originalType, ParameterizedType definingType,
ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, boolean lenient) {
Type[] subtypes = new Type[definingType.getActualTypeArguments().length];
// materialize possible type variables
for (int i = 0; i < subtypes.length; i++) {
final Type actualTypeArg = definingType.getActualTypeArguments()[i];
// materialize immediate TypeVariables
if (actualTypeArg instanceof TypeVariable<?>) {
subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) actualTypeArg);
}
// class or parameterized type
else {
subtypes[i] = actualTypeArg;
}
}
TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length];
for (int i = 0; i < subtypes.length; i++) {
final ArrayList<Type> subTypeHierarchy = new ArrayList<>(typeHierarchy);
subTypeHierarchy.add(subtypes[i]);
// sub type could not be determined with materializing
// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
if (subtypes[i] instanceof TypeVariable<?>) {
subTypesInfo[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
// variable could not be determined
if (subTypesInfo[i] == null && !lenient) {
throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
+ "' could not be determined. This is most likely a type erasure problem. "
+ "The type extraction currently supports types with generic variables only in cases where "
+ "all variables in the return type can be deduced from the input type(s).");
}
} else {
// create the type information of the subtype or null/exception
try {
subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
} catch (InvalidTypesException e) {
if (lenient) {
subTypesInfo[i] = null;
} else {
throw e;
}
}
}
}
// check that number of fields matches the number of subtypes
if (!lenient) {
Class<?> originalTypeAsClass = null;
if (isClassType(originalType)) {
originalTypeAsClass = typeToClass(originalType);
}
checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
// check if the class we assumed to conform to the defining type so far is actually a pojo because the
// original type contains additional fields.
// check for additional fields.
int fieldCount = countFieldsInClass(originalTypeAsClass);
if(fieldCount > subTypesInfo.length) {
return null;
}
}
return subTypesInfo;
}
/**
* Creates type information using a factory if for this type or super types. Returns null otherwise.
*/
@SuppressWarnings("unchecked")
private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoFromFactory(
Type t, ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
final ArrayList<Type> factoryHierarchy = new ArrayList<>(typeHierarchy);
final TypeInfoFactory<? super OUT> factory = getClosestFactory(factoryHierarchy, t);
if (factory == null) {
return null;
}
final Type factoryDefiningType = factoryHierarchy.get(factoryHierarchy.size() - 1);
// infer possible type parameters from input
final Map<String, TypeInformation<?>> genericParams;
if (factoryDefiningType instanceof ParameterizedType) {
genericParams = new HashMap<>();
final ParameterizedType paramDefiningType = (ParameterizedType) factoryDefiningType;
final Type[] args = typeToClass(paramDefiningType).getTypeParameters();
final TypeInformation<?>[] subtypeInfo = createSubTypesInfo(t, paramDefiningType, factoryHierarchy, in1Type, in2Type, true);
assert subtypeInfo != null;
for (int i = 0; i < subtypeInfo.length; i++) {
genericParams.put(args[i].toString(), subtypeInfo[i]);
}
} else {
genericParams = Collections.emptyMap();
}
final TypeInformation<OUT> createdTypeInfo = (TypeInformation<OUT>) factory.createTypeInfo(t, genericParams);
if (createdTypeInfo == null) {
throw new InvalidTypesException("TypeInfoFactory returned invalid TypeInformation 'null'");
}
return createdTypeInfo;
}
// --------------------------------------------------------------------------------------------
// Extract type parameters
// --------------------------------------------------------------------------------------------
@PublicEvolving
public static Type getParameterType(Class<?> baseClass, Class<?> clazz, int pos) {
return getParameterType(baseClass, null, clazz, pos);
}
private static Type getParameterType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Class<?> clazz, int pos) {
if (typeHierarchy != null) {
typeHierarchy.add(clazz);
}
Type[] interfaceTypes = clazz.getGenericInterfaces();
// search in interfaces for base class
for (Type t : interfaceTypes) {
Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
if (parameter != null) {
return parameter;
}
}
// search in superclass for base class
Type t = clazz.getGenericSuperclass();
Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
if (parameter != null) {
return parameter;
}
throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " +
"Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point");
}
private static Type getParameterTypeFromGenericType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Type t, int pos) {
// base class
if (t instanceof ParameterizedType && baseClass.equals(((ParameterizedType) t).getRawType())) {
if (typeHierarchy != null) {
typeHierarchy.add(t);
}
ParameterizedType baseClassChild = (ParameterizedType) t;
return baseClassChild.getActualTypeArguments()[pos];
}
// interface that extended base class as class or parameterized type
else if (t instanceof ParameterizedType && baseClass.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType())) {
if (typeHierarchy != null) {
typeHierarchy.add(t);
}
return getParameterType(baseClass, typeHierarchy, (Class<?>) ((ParameterizedType) t).getRawType(), pos);
}
else if (t instanceof Class<?> && baseClass.isAssignableFrom((Class<?>) t)) {
if (typeHierarchy != null) {
typeHierarchy.add(t);
}
return getParameterType(baseClass, typeHierarchy, (Class<?>) t, pos);
}
return null;
}
// --------------------------------------------------------------------------------------------
// Validate input
// --------------------------------------------------------------------------------------------
private static void validateInputType(Type t, TypeInformation<?> inType) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
try {
validateInfo(typeHierarchy, t, inType);
}
catch(InvalidTypesException e) {
throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
}
}
private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inTypeInfo) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
// try to get generic parameter
Type inType;
try {
inType = getParameterType(baseClass, typeHierarchy, clazz, inputParamPos);
}
catch (InvalidTypesException e) {
return; // skip input validation e.g. for raw types
}
try {
validateInfo(typeHierarchy, inType, inTypeInfo);
}
catch(InvalidTypesException e) {
throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
}
}
@SuppressWarnings("unchecked")
private static void validateInfo(ArrayList<Type> typeHierarchy, Type type, TypeInformation<?> typeInfo) {
if (type == null) {
throw new InvalidTypesException("Unknown Error. Type is null.");
}
if (typeInfo == null) {
throw new InvalidTypesException("Unknown Error. TypeInformation is null.");
}
if (!(type instanceof TypeVariable<?>)) {
// check for Java Basic Types
if (typeInfo instanceof BasicTypeInfo) {
TypeInformation<?> actual;
// check if basic type at all
if (!(type instanceof Class<?>) || (actual = BasicTypeInfo.getInfoFor((Class<?>) type)) == null) {
throw new InvalidTypesException("Basic type expected.");
}
// check if correct basic type
if (!typeInfo.equals(actual)) {
throw new InvalidTypesException("Basic type '" + typeInfo + "' expected but was '" + actual + "'.");
}
}
// check for Java SQL time types
else if (typeInfo instanceof SqlTimeTypeInfo) {
TypeInformation<?> actual;
// check if SQL time type at all
if (!(type instanceof Class<?>) || (actual = SqlTimeTypeInfo.getInfoFor((Class<?>) type)) == null) {
throw new InvalidTypesException("SQL time type expected.");
}
// check if correct SQL time type
if (!typeInfo.equals(actual)) {
throw new InvalidTypesException("SQL time type '" + typeInfo + "' expected but was '" + actual + "'.");
}
}
// check for Java Tuples
else if (typeInfo instanceof TupleTypeInfo) {
// check if tuple at all
if (!(isClassType(type) && Tuple.class.isAssignableFrom(typeToClass(type)))) {
throw new InvalidTypesException("Tuple type expected.");
}
// do not allow usage of Tuple as type
if (isClassType(type) && typeToClass(type).equals(Tuple.class)) {
throw new InvalidTypesException("Concrete subclass of Tuple expected.");
}
// go up the hierarchy until we reach immediate child of Tuple (with or without generics)
while (!(isClassType(type) && typeToClass(type).getSuperclass().equals(Tuple.class))) {
typeHierarchy.add(type);
type = typeToClass(type).getGenericSuperclass();
}
if(type == Tuple0.class) {
return;
}
// check if immediate child of Tuple has generics
if (type instanceof Class<?>) {
throw new InvalidTypesException("Parameterized Tuple type expected.");
}
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) typeInfo;
Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments();
if (subTypes.length != tti.getArity()) {
throw new InvalidTypesException("Tuple arity '" + tti.getArity() + "' expected but was '"
+ subTypes.length + "'.");
}
for (int i = 0; i < subTypes.length; i++) {
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
}
}
// check for primitive array
else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
Type component;
// check if array at all
if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null)
&& !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) {
throw new InvalidTypesException("Array type expected.");
}
if (component instanceof TypeVariable<?>) {
component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component);
if (component instanceof TypeVariable) {
return;
}
}
if (!(component instanceof Class<?> && ((Class<?>)component).isPrimitive())) {
throw new InvalidTypesException("Primitive component expected.");
}
}
// check for basic array
else if (typeInfo instanceof BasicArrayTypeInfo<?, ?>) {
Type component;
// check if array at all
if (!(type instanceof Class<?> && ((Class<?>) type).isArray() && (component = ((Class<?>) type).getComponentType()) != null)
&& !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) {
throw new InvalidTypesException("Array type expected.");
}
if (component instanceof TypeVariable<?>) {
component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component);
if (component instanceof TypeVariable) {
return;
}
}
validateInfo(typeHierarchy, component, ((BasicArrayTypeInfo<?, ?>) typeInfo).getComponentInfo());
}
// check for object array
else if (typeInfo instanceof ObjectArrayTypeInfo<?, ?>) {
// check if array at all
if (!(type instanceof Class<?> && ((Class<?>) type).isArray()) && !(type instanceof GenericArrayType)) {
throw new InvalidTypesException("Object array type expected.");
}
// check component
Type component;
if (type instanceof Class<?>) {
component = ((Class<?>) type).getComponentType();
} else {
component = ((GenericArrayType) type).getGenericComponentType();
}
if (component instanceof TypeVariable<?>) {
component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component);
if (component instanceof TypeVariable) {
return;
}
}
validateInfo(typeHierarchy, component, ((ObjectArrayTypeInfo<?, ?>) typeInfo).getComponentInfo());
}
// check for value
else if (typeInfo instanceof ValueTypeInfo<?>) {
// check if value at all
if (!(type instanceof Class<?> && Value.class.isAssignableFrom((Class<?>) type))) {
throw new InvalidTypesException("Value type expected.");
}
TypeInformation<?> actual;
// check value type contents
if (!((ValueTypeInfo<?>) typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) {
throw new InvalidTypesException("Value type '" + typeInfo + "' expected but was '" + actual + "'.");
}
}
// check for POJO
else if (typeInfo instanceof PojoTypeInfo) {
Class<?> clazz = null;
if (!(isClassType(type) && ((PojoTypeInfo<?>) typeInfo).getTypeClass() == (clazz = typeToClass(type)))) {
throw new InvalidTypesException("POJO type '"
+ ((PojoTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '"
+ clazz.getCanonicalName() + "'.");
}
}
// check for Enum
else if (typeInfo instanceof EnumTypeInfo) {
if (!(type instanceof Class<?> && Enum.class.isAssignableFrom((Class<?>) type))) {
throw new InvalidTypesException("Enum type expected.");
}
// check enum type contents
if (!(typeInfo.getTypeClass() == type)) {
throw new InvalidTypesException("Enum type '" + typeInfo.getTypeClass().getCanonicalName() + "' expected but was '"
+ typeToClass(type).getCanonicalName() + "'.");
}
}
// check for generic object
else if (typeInfo instanceof GenericTypeInfo<?>) {
Class<?> clazz = null;
if (!(isClassType(type) && (clazz = typeToClass(type)).isAssignableFrom(((GenericTypeInfo<?>) typeInfo).getTypeClass()))) {
throw new InvalidTypesException("Generic type '"
+ ((GenericTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' or a subclass of it expected but was '"
+ clazz.getCanonicalName() + "'.");
}
}
// check for Writable
else {
validateIfWritable(typeInfo, type);
}
} else {
type = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) type);
if (!(type instanceof TypeVariable)) {
validateInfo(typeHierarchy, type, typeInfo);
}
}
}
private static void validateInputContainsExecutable(LambdaExecutable exec, TypeInformation<?> typeInfo) {
List<Method> methods = getAllDeclaredMethods(typeInfo.getTypeClass());
for (Method method : methods) {
if (exec.executablesEquals(method)) {
return;
}
}
Constructor<?>[] constructors = typeInfo.getTypeClass().getDeclaredConstructors();
for (Constructor<?> constructor : constructors) {
if (exec.executablesEquals(constructor)) {
return;
}
}
throw new InvalidTypesException("Type contains no executable '" + exec.getName() + "'.");
}
// --------------------------------------------------------------------------------------------
// Utility methods
// --------------------------------------------------------------------------------------------
/**
* Returns the type information factory for a type using the factory registry or annotations.
*/
@Internal
public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Type t) {
final Class<?> factoryClass;
if (registeredTypeInfoFactories.containsKey(t)) {
factoryClass = registeredTypeInfoFactories.get(t);
}
else {
if (!isClassType(t) || !typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
return null;
}
final TypeInfo typeInfoAnnotation = typeToClass(t).getAnnotation(TypeInfo.class);
factoryClass = typeInfoAnnotation.value();
// check for valid factory class
if (!TypeInfoFactory.class.isAssignableFrom(factoryClass)) {
throw new InvalidTypesException("TypeInfo annotation does not specify a valid TypeInfoFactory.");
}
}
// instantiate
return (TypeInfoFactory<OUT>) InstantiationUtil.instantiate(factoryClass);
}
/**
* @return number of items with equal type or same raw type
*/
private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) {
int count = 0;
for (Type t : typeHierarchy) {
if (t == type || (isClassType(type) && t == typeToClass(type)) || (isClassType(t) && typeToClass(t) == type)) {
count++;
}
}
return count;
}
/**
* Traverses the type hierarchy up until a type information factory can be found.
*
* @param typeHierarchy hierarchy to be filled while traversing up
* @param t type for which a factory needs to be found
* @return closest type information factory or null if there is no factory in the type hierarchy
*/
private static <OUT> TypeInfoFactory<? super OUT> getClosestFactory(ArrayList<Type> typeHierarchy, Type t) {
TypeInfoFactory factory = null;
while (factory == null && isClassType(t) && !(typeToClass(t).equals(Object.class))) {
typeHierarchy.add(t);
factory = getTypeInfoFactory(t);
t = typeToClass(t).getGenericSuperclass();
if (t == null) {
break;
}
}
return factory;
}
private int countFieldsInClass(Class<?> clazz) {
int fieldCount = 0;
for(Field field : clazz.getFields()) { // get all fields
if( !Modifier.isStatic(field.getModifiers()) &&
!Modifier.isTransient(field.getModifiers())
) {
fieldCount++;
}
}
return fieldCount;
}
private static void validateLambdaGenericParameters(LambdaExecutable exec) {
// check the arguments
for (Type t : exec.getParameterTypes()) {
validateLambdaGenericParameter(t);
}
// check the return type
validateLambdaGenericParameter(exec.getReturnType());
}
private static void validateLambdaGenericParameter(Type t) {
if(!(t instanceof Class)) {
return;
}
final Class<?> clazz = (Class<?>) t;
if(clazz.getTypeParameters().length > 0) {
throw new InvalidTypesException("The generic type parameters of '" + clazz.getSimpleName() + "' are missing. \n"
+ "It seems that your compiler has not stored them into the .class file. \n"
+ "Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. \n"
+ "See the documentation for more information about how to compile jobs containing lambda expressions.");
}
}
/**
* Tries to find a concrete value (Class, ParameterizedType etc. ) for a TypeVariable by traversing the type hierarchy downwards.
* If a value could not be found it will return the most bottom type variable in the hierarchy.
*/
private static Type materializeTypeVariable(ArrayList<Type> typeHierarchy, TypeVariable<?> typeVar) {
TypeVariable<?> inTypeTypeVar = typeVar;
// iterate thru hierarchy from top to bottom until type variable gets a class assigned
for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
Type curT = typeHierarchy.get(i);
// parameterized type
if (curT instanceof ParameterizedType) {
Class<?> rawType = ((Class<?>) ((ParameterizedType) curT).getRawType());
for (int paramIndex = 0; paramIndex < rawType.getTypeParameters().length; paramIndex++) {
TypeVariable<?> curVarOfCurT = rawType.getTypeParameters()[paramIndex];
// check if variable names match
if (sameTypeVars(curVarOfCurT, inTypeTypeVar)) {
Type curVarType = ((ParameterizedType) curT).getActualTypeArguments()[paramIndex];
// another type variable level
if (curVarType instanceof TypeVariable<?>) {
inTypeTypeVar = (TypeVariable<?>) curVarType;
}
// class
else {
return curVarType;
}
}
}
}
}
// can not be materialized, most likely due to type erasure
// return the type variable of the deepest level
return inTypeTypeVar;
}
/**
* Creates type information from a given Class such as Integer, String[] or POJOs.
*
* This method does not support ParameterizedTypes such as Tuples or complex type hierarchies.
* In most cases {@link TypeExtractor#createTypeInfo(Type)} is the recommended method for type extraction
* (a Class is a child of Type).
*
* @param clazz a Class to create TypeInformation for
* @return TypeInformation that describes the passed Class
*/
public static <X> TypeInformation<X> getForClass(Class<X> clazz) {
final ArrayList<Type> typeHierarchy = new ArrayList<>();
typeHierarchy.add(clazz);
return new TypeExtractor().privateGetForClass(clazz, typeHierarchy);
}
private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) {
return privateGetForClass(clazz, typeHierarchy, null, null, null);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
checkNotNull(clazz);
// check if type information can be produced using a factory
final TypeInformation<OUT> typeFromFactory = createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
if (typeFromFactory != null) {
return typeFromFactory;
}
// Object is handled as generic type info
if (clazz.equals(Object.class)) {
return new GenericTypeInfo<>(clazz);
}
// Class is handled as generic type info
if (clazz.equals(Class.class)) {
return new GenericTypeInfo<OUT>(clazz);
}
// recursive types are handled as generic type info
if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
return new GenericTypeInfo<>(clazz);
}
// check for arrays
if (clazz.isArray()) {
// primitive arrays: int[], byte[], ...
PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo = PrimitiveArrayTypeInfo.getInfoFor(clazz);
if (primitiveArrayInfo != null) {
return primitiveArrayInfo;
}
// basic type arrays: String[], Integer[], Double[]
BasicArrayTypeInfo<OUT, ?> basicArrayInfo = BasicArrayTypeInfo.getInfoFor(clazz);
if (basicArrayInfo != null) {
return basicArrayInfo;
}
// object arrays
else {
TypeInformation<?> componentTypeInfo = createTypeInfoWithTypeHierarchy(
typeHierarchy,
clazz.getComponentType(),
in1Type,
in2Type);
return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
}
}
// check for writable types
if (isHadoopWritable(clazz)) {
return createHadoopWritableTypeInfo(clazz);
}
// check for basic types
TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
if (basicTypeInfo != null) {
return basicTypeInfo;
}
// check for SQL time types
TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
if (timeTypeInfo != null) {
return timeTypeInfo;
}
// check for subclasses of Value
if (Value.class.isAssignableFrom(clazz)) {
Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
return (TypeInformation<OUT>) ValueTypeInfo.getValueTypeInfo(valueClass);
}
// check for subclasses of Tuple
if (Tuple.class.isAssignableFrom(clazz)) {
if(clazz == Tuple0.class) {
return new TupleTypeInfo(Tuple0.class);
}
throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class.");
}
// check for Enums
if(Enum.class.isAssignableFrom(clazz)) {
return new EnumTypeInfo(clazz);
}
// special case for POJOs generated by Avro.
if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
}
if (Modifier.isInterface(clazz.getModifiers())) {
// Interface has no members and is therefore not handled as POJO
return new GenericTypeInfo<OUT>(clazz);
}
try {
TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), parameterizedType, in1Type, in2Type);
if (pojoType != null) {
return pojoType;
}
} catch (InvalidTypesException e) {
if(LOG.isDebugEnabled()) {
LOG.debug("Unable to handle type "+clazz+" as POJO. Message: "+e.getMessage(), e);
}
// ignore and create generic type info
}
// return a generic type
return new GenericTypeInfo<OUT>(clazz);
}
/**
* Checks if the given field is a valid pojo field:
* - it is public
* OR
* - there are getter and setter methods for the field.
*
* @param f field to check
* @param clazz class of field
* @param typeHierarchy type hierarchy for materializing generic types
*/
private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHierarchy) {
if(Modifier.isPublic(f.getModifiers())) {
return true;
} else {
boolean hasGetter = false, hasSetter = false;
final String fieldNameLow = f.getName().toLowerCase().replaceAll("_", "");
Type fieldType = f.getGenericType();
Class<?> fieldTypeWrapper = ClassUtils.primitiveToWrapper(f.getType());
TypeVariable<?> fieldTypeGeneric = null;
if(fieldType instanceof TypeVariable) {
fieldTypeGeneric = (TypeVariable<?>) fieldType;
fieldType = materializeTypeVariable(typeHierarchy, (TypeVariable<?>)fieldType);
}
for(Method m : clazz.getMethods()) {
final String methodNameLow = m.getName().endsWith("_$eq") ?
m.getName().toLowerCase().replaceAll("_", "").replaceFirst("\\$eq$", "_\\$eq") :
m.getName().toLowerCase().replaceAll("_", "");
// check for getter
if( // The name should be "get<FieldName>" or "<fieldName>" (for scala) or "is<fieldName>" for boolean fields.
(methodNameLow.equals("get"+fieldNameLow) || methodNameLow.equals("is"+fieldNameLow) || methodNameLow.equals(fieldNameLow)) &&
// no arguments for the getter
m.getParameterTypes().length == 0 &&
// return type is same as field type (or the generic variant of it)
(m.getGenericReturnType().equals( fieldType ) || (fieldTypeWrapper != null && m.getReturnType().equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
) {
hasGetter = true;
}
// check for setters (<FieldName>_$eq for scala)
if((methodNameLow.equals("set"+fieldNameLow) || methodNameLow.equals(fieldNameLow+"_$eq")) &&
m.getParameterTypes().length == 1 && // one parameter of the field's type
(m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeWrapper != null && m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
// return type is void.
m.getReturnType().equals(Void.TYPE)
) {
hasSetter = true;
}
}
if(hasGetter && hasSetter) {
return true;
} else {
if(!hasGetter) {
LOG.info(clazz+" does not contain a getter for field "+f.getName() );
}
if(!hasSetter) {
LOG.info(clazz+" does not contain a setter for field "+f.getName() );
}
return false;
}
}
}
@SuppressWarnings("unchecked")
protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
if (!Modifier.isPublic(clazz.getModifiers())) {
LOG.info("Class " + clazz.getName() + " is not public so it cannot be used as a POJO type " +
"and must be processed as GenericType. Please read the Flink documentation " +
"on \"Data Types & Serialization\" for details of the effect on performance.");
return new GenericTypeInfo<OUT>(clazz);
}
// add the hierarchy of the POJO itself if it is generic
if (parameterizedType != null) {
getTypeHierarchy(typeHierarchy, parameterizedType, Object.class);
}
// create a type hierarchy, if the incoming only contains the most bottom one or none.
else if (typeHierarchy.size() <= 1) {
getTypeHierarchy(typeHierarchy, clazz, Object.class);
}
List<Field> fields = getAllDeclaredFields(clazz, false);
if (fields.size() == 0) {
LOG.info("No fields were detected for " + clazz + " so it cannot be used as a POJO type " +
"and must be processed as GenericType. Please read the Flink documentation " +
"on \"Data Types & Serialization\" for details of the effect on performance.");
return new GenericTypeInfo<OUT>(clazz);
}
List<PojoField> pojoFields = new ArrayList<PojoField>();
for (Field field : fields) {
Type fieldType = field.getGenericType();
if(!isValidPojoField(field, clazz, typeHierarchy)) {
LOG.info("Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " +
"and must be processed as GenericType. Please read the Flink documentation " +
"on \"Data Types & Serialization\" for details of the effect on performance.");
return null;
}
try {
ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy);
fieldTypeHierarchy.add(fieldType);
TypeInformation<?> ti = createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, in1Type, in2Type);
pojoFields.add(new PojoField(field, ti));
} catch (InvalidTypesException e) {
Class<?> genericClass = Object.class;
if(isClassType(fieldType)) {
genericClass = typeToClass(fieldType);
}
pojoFields.add(new PojoField(field, new GenericTypeInfo<OUT>((Class<OUT>) genericClass)));
}
}
CompositeType<OUT> pojoType = new PojoTypeInfo<OUT>(clazz, pojoFields);
//
// Validate the correctness of the pojo.
// returning "null" will result create a generic type information.
//
List<Method> methods = getAllDeclaredMethods(clazz);
for (Method method : methods) {
if (method.getName().equals("readObject") || method.getName().equals("writeObject")) {
LOG.info("Class " + clazz + " contains custom serialization methods we do not call, so it cannot be used as a POJO type " +
"and must be processed as GenericType. Please read the Flink documentation " +
"on \"Data Types & Serialization\" for details of the effect on performance.");
return null;
}
}
// Try retrieving the default constructor, if it does not have one
// we cannot use this because the serializer uses it.
Constructor defaultConstructor = null;
try {
defaultConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
LOG.info(clazz + " is abstract or an interface, having a concrete " +
"type can increase performance.");
} else {
LOG.info(clazz + " is missing a default constructor so it cannot be used as a POJO type " +
"and must be processed as GenericType. Please read the Flink documentation " +
"on \"Data Types & Serialization\" for details of the effect on performance.");
return null;
}
}
if(defaultConstructor != null && !Modifier.isPublic(defaultConstructor.getModifiers())) {
LOG.info("The default constructor of " + clazz + " is not Public so it cannot be used as a POJO type " +
"and must be processed as GenericType. Please read the Flink documentation " +
"on \"Data Types & Serialization\" for details of the effect on performance.");
return null;
}
// everything is checked, we return the pojo
return pojoType;
}
/**
* Recursively determine all declared fields
* This is required because class.getFields() is not returning fields defined
* in parent classes.
*
* @param clazz class to be analyzed
* @param ignoreDuplicates if true, in case of duplicate field names only the lowest one
* in a hierarchy will be returned; throws an exception otherwise
* @return list of fields
*/
@PublicEvolving
public static List<Field> getAllDeclaredFields(Class<?> clazz, boolean ignoreDuplicates) {
List<Field> result = new ArrayList<Field>();
while (clazz != null) {
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if(Modifier.isTransient(field.getModifiers()) || Modifier.isStatic(field.getModifiers())) {
continue; // we have no use for transient or static fields
}
if(hasFieldWithSameName(field.getName(), result)) {
if (ignoreDuplicates) {
continue;
} else {
throw new InvalidTypesException("The field "+field+" is already contained in the hierarchy of the "+clazz+"."
+ "Please use unique field names through your classes hierarchy");
}
}
result.add(field);
}
clazz = clazz.getSuperclass();
}
return result;
}
@PublicEvolving
public static Field getDeclaredField(Class<?> clazz, String name) {
for (Field field : getAllDeclaredFields(clazz, true)) {
if (field.getName().equals(name)) {
return field;
}
}
return null;
}
private static boolean hasFieldWithSameName(String name, List<Field> fields) {
for(Field field : fields) {
if(name.equals(field.getName())) {
return true;
}
}
return false;
}
private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) {
for (int j = 0; j < pojoInfo.getArity(); j++) {
PojoField pf = ((PojoTypeInfo<?>) pojoInfo).getPojoFieldAt(j);
if (pf.getField().getName().equals(field.getName())) {
return pf.getTypeInformation();
}
}
return null;
}
public static <X> TypeInformation<X> getForObject(X value) {
return new TypeExtractor().privateGetForObject(value);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private <X> TypeInformation<X> privateGetForObject(X value) {
checkNotNull(value);
// check if type information can be produced using a factory
final ArrayList<Type> typeHierarchy = new ArrayList<>();
typeHierarchy.add(value.getClass());
final TypeInformation<X> typeFromFactory = createTypeInfoFromFactory(value.getClass(), typeHierarchy, null, null);
if (typeFromFactory != null) {
return typeFromFactory;
}
// check if we can extract the types from tuples, otherwise work with the class
if (value instanceof Tuple) {
Tuple t = (Tuple) value;
int numFields = t.getArity();
if(numFields != countFieldsInClass(value.getClass())) {
// not a tuple since it has more fields.
return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>(), null, null, null); // we immediately call analyze Pojo here, because
// there is currently no other type that can handle such a class.
}
TypeInformation<?>[] infos = new TypeInformation[numFields];
for (int i = 0; i < numFields; i++) {
Object field = t.getField(i);
if (field == null) {
throw new InvalidTypesException("Automatic type extraction is not possible on candidates with null values. "
+ "Please specify the types directly.");
}
infos[i] = privateGetForObject(field);
}
return new TupleTypeInfo(value.getClass(), infos);
}
else if (value instanceof Row) {
Row row = (Row) value;
int arity = row.getArity();
for (int i = 0; i < arity; i++) {
if (row.getField(i) == null) {
LOG.warn("Cannot extract type of Row field, because of Row field[" + i + "] is null. " +
"Should define RowTypeInfo explicitly.");
return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
}
}
TypeInformation<?>[] typeArray = new TypeInformation<?>[arity];
for (int i = 0; i < arity; i++) {
typeArray[i] = TypeExtractor.getForObject(row.getField(i));
}
return (TypeInformation<X>) new RowTypeInfo(typeArray);
}
else {
return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
}
}
// ------------------------------------------------------------------------
// Utilities to handle Hadoop's 'Writable' type via reflection
// ------------------------------------------------------------------------
// visible for testing
static boolean isHadoopWritable(Class<?> typeClass) {
// check if this is directly the writable interface
if (typeClass.getName().equals(HADOOP_WRITABLE_CLASS)) {
return false;
}
final HashSet<Class<?>> alreadySeen = new HashSet<>();
alreadySeen.add(typeClass);
return hasHadoopWritableInterface(typeClass, alreadySeen);
}
private static boolean hasHadoopWritableInterface(Class<?> clazz, HashSet<Class<?>> alreadySeen) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> c : interfaces) {
if (c.getName().equals(HADOOP_WRITABLE_CLASS)) {
return true;
}
else if (alreadySeen.add(c) && hasHadoopWritableInterface(c, alreadySeen)) {
return true;
}
}
Class<?> superclass = clazz.getSuperclass();
return superclass != null && alreadySeen.add(superclass) && hasHadoopWritableInterface(superclass, alreadySeen);
}
// visible for testing
public static <T> TypeInformation<T> createHadoopWritableTypeInfo(Class<T> clazz) {
checkNotNull(clazz);
Class<?> typeInfoClass;
try {
typeInfoClass = Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader());
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load the TypeInformation for the class '"
+ HADOOP_WRITABLE_CLASS + "'. You may be missing the 'flink-hadoop-compatibility' dependency.");
}
try {
Constructor<?> constr = typeInfoClass.getConstructor(Class.class);
@SuppressWarnings("unchecked")
TypeInformation<T> typeInfo = (TypeInformation<T>) constr.newInstance(clazz);
return typeInfo;
}
catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
throw new RuntimeException("Incompatible versions of the Hadoop Compatibility classes found.");
}
catch (InvocationTargetException e) {
throw new RuntimeException("Cannot create Hadoop WritableTypeInfo.", e.getTargetException());
}
}
// visible for testing
static void validateIfWritable(TypeInformation<?> typeInfo, Type type) {
try {
// try to load the writable type info
Class<?> writableTypeInfoClass = Class
.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, typeInfo.getClass().getClassLoader());
if (writableTypeInfoClass.isAssignableFrom(typeInfo.getClass())) {
// this is actually a writable type info
// check if the type is a writable
if (!(type instanceof Class && isHadoopWritable((Class<?>) type))) {
throw new InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected.");
}
// check writable type contents
Class<?> clazz = (Class<?>) type;
if (typeInfo.getTypeClass() != clazz) {
throw new InvalidTypesException("Writable type '"
+ typeInfo.getTypeClass().getCanonicalName() + "' expected but was '"
+ clazz.getCanonicalName() + "'.");
}
}
}
catch (ClassNotFoundException e) {
// class not present at all, so cannot be that type info
// ignore
}
}
}