blob: d8158aabb92db8d95f6723f752e9300d6f20f352 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime.kryo;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.java.typeutils.AvroUtils;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.commons.lang3.exception.CloneFailedException;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A type serializer that serializes its type using the Kryo serialization
* framework (https://github.com/EsotericSoftware/kryo).
*
* This serializer is intended as a fallback serializer for the cases that are
* not covered by the basic types, tuples, and POJOs.
*
* @param <T> The type to be serialized.
*/
public class KryoSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 3L;
private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class);
/** Flag whether to check for concurrent thread access.
* Because this flag is static final, a value of 'false' allows the JIT compiler to eliminate
* the guarded code sections. */
private static final boolean CONCURRENT_ACCESS_CHECK =
LOG.isDebugEnabled() || KryoSerializerDebugInitHelper.setToDebug;
static {
configureKryoLogging();
}
// ------------------------------------------------------------------------
private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
/**
* Map of class tag (using classname as tag) to their Kryo registration.
*
* <p>This map serves as a preview of the final registration result of
* the Kryo instance, taking into account registration overwrites.
*/
private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
private final Class<T> type;
// ------------------------------------------------------------------------
// The fields below are lazily initialized after duplication or deserialization.
private transient Kryo kryo;
private transient T copyInstance;
private transient DataOutputView previousOut;
private transient DataInputView previousIn;
private transient Input input;
private transient Output output;
// ------------------------------------------------------------------------
// legacy fields; these fields cannot yet be removed to retain backwards compatibility
private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
private LinkedHashSet<Class<?>> registeredTypes;
// for debugging purposes
private transient volatile Thread currentThread;
// ------------------------------------------------------------------------
public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
this.type = checkNotNull(type);
this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
this.kryoRegistrations = buildKryoRegistrations(
this.type,
executionConfig.getRegisteredKryoTypes(),
executionConfig.getRegisteredTypesWithKryoSerializerClasses(),
executionConfig.getRegisteredTypesWithKryoSerializers());
}
/**
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
// deep copy the serializer instances in defaultSerializers
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
toCopy.defaultSerializers.entrySet()) {
this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
}
// deep copy the serializer instances in kryoRegistrations
for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {
KryoRegistration kryoRegistration = entry.getValue();
if (kryoRegistration.getSerializerDefinitionType() == KryoRegistration.SerializerDefinitionType.INSTANCE) {
ExecutionConfig.SerializableSerializer<? extends Serializer<?>> serializerInstance =
kryoRegistration.getSerializableSerializerInstance();
if (serializerInstance != null) {
kryoRegistration = new KryoRegistration(
kryoRegistration.getRegisteredClass(),
deepCopySerializer(serializerInstance));
}
}
this.kryoRegistrations.put(entry.getKey(), kryoRegistration);
}
}
// ------------------------------------------------------------------------
@Override
public boolean isImmutableType() {
return false;
}
@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<>(this);
}
@Override
public T createInstance() {
if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) {
return null;
} else {
checkKryoInitialized();
try {
return kryo.newInstance(type);
} catch(Throwable e) {
return null;
}
}
}
@SuppressWarnings("unchecked")
@Override
public T copy(T from) {
if (from == null) {
return null;
}
if (CONCURRENT_ACCESS_CHECK) {
enterExclusiveThread();
}
try {
checkKryoInitialized();
try {
return kryo.copy(from);
}
catch (KryoException ke) {
// kryo was unable to copy it, so we do it through serialization:
ByteArrayOutputStream baout = new ByteArrayOutputStream();
Output output = new Output(baout);
kryo.writeObject(output, from);
output.close();
ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);
return (T)kryo.readObject(input, from.getClass());
}
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
exitExclusiveThread();
}
}
}
@Override
public T copy(T from, T reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(T record, DataOutputView target) throws IOException {
if (CONCURRENT_ACCESS_CHECK) {
enterExclusiveThread();
}
try {
checkKryoInitialized();
if (target != previousOut) {
DataOutputViewStream outputStream = new DataOutputViewStream(target);
output = new Output(outputStream);
previousOut = target;
}
// Sanity check: Make sure that the output is cleared/has been flushed by the last call
// otherwise data might be written multiple times in case of a previous EOFException
if (output.position() != 0) {
throw new IllegalStateException("The Kryo Output still contains data from a previous " +
"serialize call. It has to be flushed or cleared at the end of the serialize call.");
}
try {
kryo.writeClassAndObject(output, record);
output.flush();
}
catch (KryoException ke) {
// make sure that the Kryo output buffer is cleared in case that we can recover from
// the exception (e.g. EOFException which denotes buffer full)
output.clear();
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
}
else {
throw ke;
}
}
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
exitExclusiveThread();
}
}
}
@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
if (CONCURRENT_ACCESS_CHECK) {
enterExclusiveThread();
}
try {
checkKryoInitialized();
if (source != previousIn) {
DataInputViewStream inputStream = new DataInputViewStream(source);
input = new NoFetchingInput(inputStream);
previousIn = source;
}
try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
}
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
exitExclusiveThread();
}
}
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
if (CONCURRENT_ACCESS_CHECK) {
enterExclusiveThread();
}
try {
checkKryoInitialized();
if (this.copyInstance == null){
this.copyInstance = createInstance();
}
T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
exitExclusiveThread();
}
}
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
int result = type.hashCode();
result = 31 * result + (kryoRegistrations.hashCode());
result = 31 * result + (defaultSerializers.hashCode());
result = 31 * result + (defaultSerializerClasses.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;
return other.canEqual(this) &&
type == other.type &&
Objects.equals(kryoRegistrations, other.kryoRegistrations) &&
Objects.equals(defaultSerializerClasses, other.defaultSerializerClasses) &&
Objects.equals(defaultSerializers, other.defaultSerializers);
} else {
return false;
}
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof KryoSerializer;
}
// --------------------------------------------------------------------------------------------
/**
* Returns the Chill Kryo Serializer which is implicitly added to the classpath via flink-runtime.
* Falls back to the default Kryo serializer if it can't be found.
* @return The Kryo serializer instance.
*/
private Kryo getKryoInstance() {
try {
// check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill library).
// This will be true if Flink's Scala API is used.
Class<?> chillInstantiatorClazz = Class.forName("com.twitter.chill.ScalaKryoInstantiator");
Object chillInstantiator = chillInstantiatorClazz.newInstance();
// obtain a Kryo instance through Twitter Chill
Method m = chillInstantiatorClazz.getMethod("newKryo");
return (Kryo) m.invoke(chillInstantiator);
} catch (ClassNotFoundException | InstantiationException | NoSuchMethodException |
IllegalAccessException | InvocationTargetException e) {
LOG.warn("Falling back to default Kryo serializer because Chill serializer couldn't be found.", e);
Kryo.DefaultInstantiatorStrategy initStrategy = new Kryo.DefaultInstantiatorStrategy();
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(initStrategy);
return kryo;
}
}
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = getKryoInstance();
// Enable reference tracking.
kryo.setReferences(true);
// Throwable and all subclasses should be serialized via java serialization
// Note: the registered JavaSerializer is Flink's own implementation, and not Kryo's.
// This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details.
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
// Add default serializers first, so that the type registrations without a serializer
// are registered with a default serializer
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
}
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}
KryoUtils.applyRegistrations(this.kryo, kryoRegistrations.values());
kryo.setRegistrationRequired(false);
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
}
// --------------------------------------------------------------------------------------------
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------
@Override
public KryoSerializerConfigSnapshot<T> snapshotConfiguration() {
return new KryoSerializerConfigSnapshot<>(type, kryoRegistrations);
}
@SuppressWarnings("unchecked")
@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof KryoSerializerConfigSnapshot) {
final KryoSerializerConfigSnapshot<T> config = (KryoSerializerConfigSnapshot<T>) configSnapshot;
if (type.equals(config.getTypeClass())) {
LinkedHashMap<String, KryoRegistration> reconfiguredRegistrations = config.getKryoRegistrations();
// reconfigure by assuring that classes which were previously registered are registered
// again in the exact same order; new class registrations will be appended.
// this also overwrites any dummy placeholders that the restored old configuration has
reconfiguredRegistrations.putAll(kryoRegistrations);
// check if there is still any dummy placeholders even after reconfiguration;
// if so, then this new Kryo serializer cannot read old data and is therefore incompatible
for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : reconfiguredRegistrations.entrySet()) {
if (reconfiguredRegistrationEntry.getValue().isDummy()) {
LOG.warn("The Kryo registration for a previously registered class {} does not have a " +
"proper serializer, because its previous serializer cannot be loaded or is no " +
"longer valid but a new serializer is not available", reconfiguredRegistrationEntry.getKey());
return CompatibilityResult.requiresMigration();
}
}
// there's actually no way to tell if new Kryo serializers are compatible with
// the previous ones they overwrite; we can only signal compatibility and hope for the best
this.kryoRegistrations = reconfiguredRegistrations;
return CompatibilityResult.compatible();
}
}
return CompatibilityResult.requiresMigration();
}
public static final class KryoSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
private static final int VERSION = 1;
/** This empty nullary constructor is required for deserializing the configuration. */
public KryoSerializerConfigSnapshot() {}
public KryoSerializerConfigSnapshot(
Class<T> typeClass,
LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
super(typeClass, kryoRegistrations);
}
@Override
public int getVersion() {
return VERSION;
}
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
/**
* Utility method that takes lists of registered types and their serializers, and resolve
* them into a single list such that the result will resemble the final registration
* result in Kryo.
*/
private static LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(
Class<?> serializedType,
LinkedHashSet<Class<?>> registeredTypes,
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses,
LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers) {
final LinkedHashMap<String, KryoRegistration> kryoRegistrations = new LinkedHashMap<>();
kryoRegistrations.put(serializedType.getName(), new KryoRegistration(serializedType));
for (Class<?> registeredType : checkNotNull(registeredTypes)) {
kryoRegistrations.put(registeredType.getName(), new KryoRegistration(registeredType));
}
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> registeredTypeWithSerializerClassEntry :
checkNotNull(registeredTypesWithSerializerClasses).entrySet()) {
kryoRegistrations.put(
registeredTypeWithSerializerClassEntry.getKey().getName(),
new KryoRegistration(
registeredTypeWithSerializerClassEntry.getKey(),
registeredTypeWithSerializerClassEntry.getValue()));
}
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypeWithSerializerEntry :
checkNotNull(registeredTypesWithSerializers).entrySet()) {
kryoRegistrations.put(
registeredTypeWithSerializerEntry.getKey().getName(),
new KryoRegistration(
registeredTypeWithSerializerEntry.getKey(),
registeredTypeWithSerializerEntry.getValue()));
}
// add Avro support if flink-avro is available; a dummy otherwise
AvroUtils.getAvroUtils().addAvroGenericDataArrayRegistration(kryoRegistrations);
return kryoRegistrations;
}
static void configureKryoLogging() {
// Kryo uses only DEBUG and TRACE levels
// we only forward TRACE level, because even DEBUG levels results in
// a logging for each object, which is infeasible in Flink.
if (LOG.isTraceEnabled()) {
com.esotericsoftware.minlog.Log.setLogger(new MinlogForwarder(LOG));
com.esotericsoftware.minlog.Log.TRACE();
}
}
// --------------------------------------------------------------------------------------------
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
// kryoRegistrations may be null if this Kryo serializer is deserialized from an old version
if (kryoRegistrations == null) {
this.kryoRegistrations = buildKryoRegistrations(
type,
registeredTypes,
registeredTypesWithSerializerClasses,
registeredTypesWithSerializers);
}
}
private ExecutionConfig.SerializableSerializer<? extends Serializer<?>> deepCopySerializer(
ExecutionConfig.SerializableSerializer<? extends Serializer<?>> original) {
try {
return InstantiationUtil.clone(original, Thread.currentThread().getContextClassLoader());
} catch (IOException | ClassNotFoundException ex) {
throw new CloneFailedException(
"Could not clone serializer instance of class " + original.getClass(),
ex);
}
}
// --------------------------------------------------------------------------------------------
// For testing
// --------------------------------------------------------------------------------------------
private void enterExclusiveThread() {
// we use simple get, check, set here, rather than CAS
// we don't need lock-style correctness, this is only a sanity-check and we thus
// favor speed at the cost of some false negatives in this check
Thread previous = currentThread;
Thread thisThread = Thread.currentThread();
if (previous == null) {
currentThread = thisThread;
}
else if (previous != thisThread) {
throw new IllegalStateException(
"Concurrent access to KryoSerializer. Thread 1: " + thisThread.getName() +
" , Thread 2: " + previous.getName());
}
}
private void exitExclusiveThread() {
currentThread = null;
}
@VisibleForTesting
public Kryo getKryo() {
checkKryoInitialized();
return this.kryo;
}
}