blob: f3801991be6aec116604565e36d0a2087f33d167 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.formats.avro.utils.DataInputDecoder;
import org.apache.flink.formats.avro.utils.DataOutputEncoder;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import com.esotericsoftware.kryo.Kryo;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.util.Utf8;
import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Old deprecated Avro serializer. It is retained for a smoother experience when
* upgrading from an earlier Flink savepoint that stored this serializer.
*/
@Internal
@Deprecated
@SuppressWarnings({"unused", "deprecation"})
public final class AvroSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 1L;
private final Class<T> type;
private final Class<? extends T> typeToInstantiate;
/**
* Map of class tag (using classname as tag) to their Kryo registration.
*
* <p>This map serves as a preview of the final registration result of
* the Kryo instance, taking into account registration overwrites.
*/
private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
private transient ReflectDatumWriter<T> writer;
private transient ReflectDatumReader<T> reader;
private transient DataOutputEncoder encoder;
private transient DataInputDecoder decoder;
private transient Kryo kryo;
private transient T deepCopyInstance;
// --------------------------------------------------------------------------------------------
public AvroSerializer(Class<T> type) {
this(type, type);
}
public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
this.type = checkNotNull(type);
this.typeToInstantiate = checkNotNull(typeToInstantiate);
InstantiationUtil.checkForInstantiation(typeToInstantiate);
this.kryoRegistrations = buildKryoRegistrations(type);
}
// --------------------------------------------------------------------------------------------
@Override
public boolean isImmutableType() {
return false;
}
@Override
public AvroSerializer<T> duplicate() {
return new AvroSerializer<>(type, typeToInstantiate);
}
@Override
public T createInstance() {
return InstantiationUtil.instantiate(this.typeToInstantiate);
}
@Override
public T copy(T from) {
checkKryoInitialized();
return KryoUtils.copy(from, kryo, this);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
return KryoUtils.copy(from, reuse, kryo, this);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(T value, DataOutputView target) throws IOException {
checkAvroInitialized();
this.encoder.setOut(target);
this.writer.write(value, this.encoder);
}
@Override
public T deserialize(DataInputView source) throws IOException {
checkAvroInitialized();
this.decoder.setIn(source);
return this.reader.read(null, this.decoder);
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
checkAvroInitialized();
this.decoder.setIn(source);
return this.reader.read(reuse, this.decoder);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
checkAvroInitialized();
if (this.deepCopyInstance == null) {
this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
}
this.decoder.setIn(source);
this.encoder.setOut(target);
T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
this.writer.write(tmp, this.encoder);
}
private void checkAvroInitialized() {
if (this.reader == null) {
this.reader = new ReflectDatumReader<>(type);
this.writer = new ReflectDatumWriter<>(type);
this.encoder = new DataOutputEncoder();
this.decoder = new DataInputDecoder();
}
}
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
kryo.setAsmEnabled(true);
KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
}
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof AvroSerializer) {
@SuppressWarnings("unchecked")
AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
return avroSerializer.canEqual(this) &&
type == avroSerializer.type &&
typeToInstantiate == avroSerializer.typeToInstantiate;
} else {
return false;
}
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof AvroSerializer;
}
// --------------------------------------------------------------------------------------------
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------
@Override
public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
}
@SuppressWarnings("unchecked")
@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;
if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) {
// resolve Kryo registrations; currently, since the Kryo registrations in Avro
// are fixed, there shouldn't be a problem with the resolution here.
LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
oldRegistrations.putAll(kryoRegistrations);
for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
if (reconfiguredRegistrationEntry.getValue().isDummy()) {
return CompatibilityResult.requiresMigration();
}
}
this.kryoRegistrations = oldRegistrations;
return CompatibilityResult.compatible();
}
}
// ends up here if the preceding serializer is not
// the ValueSerializer, or serialized data type has changed
return CompatibilityResult.requiresMigration();
}
/**
* Config snapshot for this serializer.
*/
public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
private static final int VERSION = 1;
private Class<? extends T> typeToInstantiate;
public AvroSerializerConfigSnapshot() {}
public AvroSerializerConfigSnapshot(
Class<T> baseType,
Class<? extends T> typeToInstantiate,
LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
super(baseType, kryoRegistrations);
this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
}
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
out.writeUTF(typeToInstantiate.getName());
}
@SuppressWarnings("unchecked")
@Override
public void read(DataInputView in) throws IOException {
super.read(in);
String classname = in.readUTF();
try {
typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("Cannot find requested class " + classname + " in classpath.", e);
}
}
@Override
public int getVersion() {
return VERSION;
}
public Class<? extends T> getTypeToInstantiate() {
return typeToInstantiate;
}
}
// --------------------------------------------------------------------------------------------
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
// kryoRegistrations may be null if this Avro serializer is deserialized from an old version
if (kryoRegistrations == null) {
this.kryoRegistrations = buildKryoRegistrations(type);
}
}
private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) {
final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>();
// register Avro types.
registrations.put(
GenericData.Array.class.getName(),
new KryoRegistration(
GenericData.Array.class,
new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class));
registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class));
registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class));
registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class));
// register the serialized data type
registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType));
return registrations;
}
}