blob: 644ee50d361724c77a4e733bf5fb0f12e1dd6494 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.formats.avro.typeutils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.PojoField;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.avro.specific.SpecificRecordBase;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs)
*
* <p>Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>} with a {@code GenericType<avro.Utf8>}.
* All other types used by Avro are standard Java types.
* Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime.
* CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here
* by generic type infos containing Utf8 classes (which are comparable),
*
* <p>This class is checked by the AvroPojoTest.
*/
public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
private static final long serialVersionUID = 1L;
private static final ConcurrentHashMap<Thread, Boolean> IN_BACKWARDS_COMPATIBLE_MODE = new ConcurrentHashMap<>();
private final boolean useBackwardsCompatibleSerializer;
/**
* Creates a new Avro type info for the given class.
*/
public AvroTypeInfo(Class<T> typeClass) {
this(typeClass, false);
}
/**
* Creates a new Avro type info for the given class.
*
* <p>This constructor takes a flag to specify whether a serializer
* that is backwards compatible with PoJo-style serialization of Avro types should be used.
* That is only necessary, if one has a Flink 1.3 (or earlier) savepoint where Avro types
* were stored in the checkpointed state. New Flink programs will never need this.
*/
public AvroTypeInfo(Class<T> typeClass, boolean useBackwardsCompatibleSerializer) {
super(typeClass, generateFieldsFromAvroSchema(typeClass, useBackwardsCompatibleSerializer));
final Boolean modeOnStack = IN_BACKWARDS_COMPATIBLE_MODE.get(Thread.currentThread());
this.useBackwardsCompatibleSerializer = modeOnStack == null ?
useBackwardsCompatibleSerializer : modeOnStack;
}
@Override
@SuppressWarnings("deprecation")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
return useBackwardsCompatibleSerializer ?
new BackwardsCompatibleAvroSerializer<>(getTypeClass()) :
new AvroSerializer<>(getTypeClass());
}
@SuppressWarnings("unchecked")
@Internal
public static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(
Class<T> typeClass,
boolean useBackwardsCompatibleSerializer) {
final Thread currentThread = Thread.currentThread();
final boolean entryPoint =
IN_BACKWARDS_COMPATIBLE_MODE.putIfAbsent(currentThread, useBackwardsCompatibleSerializer) == null;
try {
PojoTypeExtractor pte = new PojoTypeExtractor();
ArrayList<Type> typeHierarchy = new ArrayList<>();
typeHierarchy.add(typeClass);
TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
if (!(ti instanceof PojoTypeInfo)) {
throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
}
PojoTypeInfo pti = (PojoTypeInfo) ti;
List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
for (int i = 0; i < pti.getArity(); i++) {
PojoField f = pti.getPojoFieldAt(i);
TypeInformation newType = f.getTypeInformation();
// check if type is a CharSequence
if (newType instanceof GenericTypeInfo) {
if ((newType).getTypeClass().equals(CharSequence.class)) {
// replace the type by a org.apache.avro.util.Utf8
newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
}
}
PojoField newField = new PojoField(f.getField(), newType);
newFields.add(newField);
}
return newFields;
}
finally {
if (entryPoint) {
IN_BACKWARDS_COMPATIBLE_MODE.remove(currentThread);
}
}
}
private static class PojoTypeExtractor extends TypeExtractor {
private PojoTypeExtractor() {
super();
}
@Override
public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type);
}
}
}