blob: 1a8f2c22695c7e6a595dde206a7cccccf42a1f06 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.giraph.writable.kryo;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import com.esotericsoftware.kryo.util.DefaultClassResolver;
import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
import org.apache.giraph.conf.GiraphConfigurationSettable;
import com.esotericsoftware.kryo.ClassResolver;
import com.esotericsoftware.kryo.ReferenceResolver;
import com.esotericsoftware.kryo.util.MapReferenceResolver;
import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
import org.apache.giraph.types.ops.collections.BasicSet;
import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer;
import org.apache.giraph.writable.kryo.serializers.ImmutableBiMapSerializerUtils;
import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.objenesis.strategy.StdInstantiatorStrategy;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.SerializerFactory;
import com.esotericsoftware.kryo.pool.KryoCallback;
import com.esotericsoftware.kryo.pool.KryoFactory;
import com.esotericsoftware.kryo.pool.KryoPool;
import com.esotericsoftware.kryo.serializers.ClosureSerializer;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.util.ObjectMap;
import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
* Kryo instance that provides serialization through DataInput/DataOutput
* that uses.
* All public APIs are static.
* It extends Kryo to reuse KryoPool functionality, but have additional needed
* objects cached as well. If we move to ThreadLocal or other caching
* technique, we can use composition, instead of inheritance here.
* TODO: Refactor this class into two separate classes depending on
* whether the reference tracking is enabled or disabled.
public class HadoopKryo extends Kryo {
/** Pool of reusable Kryo objects, since they are expensive to create */
private static final KryoPool KRYO_POOL = new KryoPool.Builder(
new KryoFactory() {
public Kryo create() {
return createKryo(true, true);
/** Thread local HadoopKryo object */
private static final ThreadLocal<HadoopKryo> KRYO =
new ThreadLocal<HadoopKryo>() {
@Override protected HadoopKryo initialValue() {
return createKryo(false, false);
* List of interfaces/parent classes that will not be allowed to be
* serialized, together with explanation of why, that will be shown
* when throwing such exception
private static final Map<Class<?>, String> NON_SERIALIZABLE;
static {
NON_SERIALIZABLE = new LinkedHashMap<>();
"it is marked to not allow serialization, " +
"look at the class for more details");
KryoWritableWrapper.class, "recursion is disallowed");
"it cannot be supported since it contains ClassLoader");
GiraphConfigurationSettable.class, "configuration cannot be set");
Configurable.class, "configuration cannot be set");
"it should be rarely serialized, since it would create same stream " +
"of numbers everywhere, use TransientRandom instead");
"Logger must be a static field");
/** Reusable Input object */
private InputChunked input;
/** Reusable Output object */
private OutputChunked output;
/** Reusable DataInput wrapper stream */
private DataInputWrapperStream dataInputWrapperStream;
/** Reusable DataOutput wrapper stream */
private DataOutputWrapperStream dataOutputWrapperStream;
* Map of already initialized serializers used
* for readIntoObject/writeOutOfObject pair of methods
private final ObjectMap<Class<?>, ReusableFieldSerializer<Object>>
classToIntoSerializer = new ObjectMap<>();
/** Hide constructor, so all access go through pool of cached objects */
private HadoopKryo() {
* Constructor that takes custom class resolver and reference resolver.
* @param classResolver Class resolver
* @param referenceResolver Reference resolver
private HadoopKryo(ClassResolver classResolver,
ReferenceResolver referenceResolver) {
super(classResolver, referenceResolver);
// Public API:
* Write type of given object and the object itself to the output stream.
* Inverse of readClassAndObj.
* @param out Output stream
* @param object Object to write
public static void writeClassAndObj(
final DataOutput out, final Object object) {
writeInternal(out, object, false);
* Read object from the input stream, by reading first type of the object,
* and then all of its fields.
* Inverse of writeClassAndObject.
* @param in Input stream
* @return Deserialized object
* @param <T> Type of the object being read
public static <T> T readClassAndObj(DataInput in) {
return readInternal(in, null, false);
* Write an object to output, in a way that can be read by readIntoObject.
* @param out Output stream
* @param object Object to be written
public static void writeOutOfObject(
final DataOutput out, final Object object) {
writeInternal(out, object, true);
* Reads an object, from input, into a given object,
* allowing object reuse.
* Inverse of writeOutOfObject.
* @param in Input stream
* @param object Object to fill from input
public static void readIntoObject(DataInput in, Object object) {
readInternal(in, object, true);
* Writes class and object to specified output stream with specified
* Kryo object. It does not use interim buffers.
* @param kryo Kryo object
* @param out Output stream
* @param object Object
public static void writeWithKryo(
final HadoopKryo kryo, final Output out,
final Object object) {
kryo.writeClassAndObject(out, object);
* Write out of object with given kryo
* @param kryo Kryo object
* @param out Output
* @param object Object to write
public static void writeWithKryoOutOfObject(
final HadoopKryo kryo, final Output out,
final Object object) {
kryo.writeOutOfObject(out, object);
* Reads class and object from specified input stream with
* specified kryo object.
* it does not use interim buffers.
* @param kryo Kryo object
* @param in Input buffer
* @param <T> Object type parameter
* @return Object
public static <T> T readWithKryo(
final HadoopKryo kryo, final Input in) {
T object;
object = (T) kryo.readClassAndObject(in);
return object;
* Read into object with given kryo.
* @param kryo Kryo object
* @param in Input
* @param object Object to read into
public static void readWithKryoIntoObject(
final HadoopKryo kryo, final Input in, Object object) {
kryo.readIntoObject(in, object);
* Create copy of the object, by magically recursively copying
* all of its fields, keeping reference structures (like cycles)
* @param object Object to be copied
* @return Copy of the object.
* @param <T> Type of the object
public static <T> T createCopy(final T object) {
return KryoCallback<T>() {
public T execute(Kryo kryo) {
return kryo.copy(object);
* Returns a kryo which doesn't track objects, hence
* serialization of recursive/nested objects is not
* supported.
* Reference tracking significantly degrades the performance
* since kryo has to store all serialized objects and search
* the history to check if an object has been already serialized.
* @return Hadoop kryo which doesn't track objects.
public static HadoopKryo getNontrackingKryo() {
return KRYO.get();
// Private implementation:
* Create new instance of HadoopKryo, properly initialized.
* @param trackReferences if true, object references are tracked.
* @param hasBuffer if true, an interim buffer is used.
* @return new HadoopKryo instance
private static HadoopKryo createKryo(boolean trackReferences,
boolean hasBuffer) {
HadoopKryo kryo;
if (trackReferences) {
kryo = new HadoopKryo();
} else {
// Only use GiraphClassResolver if it is properly initialized.
// This is to enable test cases which use KryoSimpleWrapper
// but don't start ZK.
kryo = new HadoopKryo(
GiraphClassResolver.isInitialized() ? new GiraphClassResolver() :
new DefaultClassResolver(),
new MapReferenceResolver());
try {
new ClosureSerializer());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(
"Trying to use Kryo on Java version " +
System.getProperty("java.version") +
", but unable to find needed classes", e);
kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());
kryo.register(Collections.nCopies(1, new Object()).getClass(),
new CollectionsNCopiesSerializer());
// There are many fastutil classes, register them at the end,
// so they don't use up small registration numbers
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(
new StdInstantiatorStrategy()));
SerializerFactory customSerializerFactory = new SerializerFactory() {
public Serializer makeSerializer(Kryo kryo, final Class<?> type) {
for (final Entry<Class<?>, String> entry :
if (entry.getKey().isAssignableFrom(type)) {
// Allow Class object to be serialized, but not a live instance.
return new Serializer() {
public Object read(Kryo kryo, Input input, Class type) {
throw new RuntimeException("Cannot serialize " + type +
". Objects being serialized cannot capture " +
entry.getKey() + " because " + entry.getValue() +
". Either remove field in question" +
", or make it transient (so that it isn't serialized)");
public void write(Kryo kryo, Output output, Object object) {
throw new RuntimeException("Cannot serialize " + type +
". Objects being serialized cannot capture " +
entry.getKey() + " because " + entry.getValue() +
". Either remove field in question" +
", or make it transient (so that it isn't serialized)");
if (Writable.class.isAssignableFrom(type) &&
!KryoIgnoreWritable.class.isAssignableFrom(type) &&
// remove BasicSet, BasicArrayList and Basic2ObjectMap temporarily,
// for lack of constructors
!BasicSet.class.isAssignableFrom(type) &&
!Basic2ObjectMap.class.isAssignableFrom(type)) {
// use the Writable method defined by the type
DirectWritableSerializer serializer = new DirectWritableSerializer();
return serializer;
} else {
FieldSerializer serializer = new FieldSerializer<>(kryo, type);
return serializer;
kryo.addDefaultSerializer(Writable.class, customSerializerFactory);
if (hasBuffer) {
kryo.input = new InputChunked(4096);
kryo.output = new OutputChunked(4096);
kryo.dataInputWrapperStream = new DataInputWrapperStream();
kryo.dataOutputWrapperStream = new DataOutputWrapperStream();
if (!trackReferences) {
// Auto reset can only be disabled if the GiraphClassResolver is
// properly initialized.
if (GiraphClassResolver.isInitialized()) {
return kryo;
* Register serializer for class with class name
* @param kryo HadoopKryo
* @param className Name of the class for which to register serializer
* @param serializer Serializer to use
private static void registerSerializer(HadoopKryo kryo, String className,
Serializer serializer) {
try {
kryo.register(Class.forName(className), serializer);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Class " + className + " is missing", e);
* Initialize reusable objects for reading from given DataInput.
* @param in Input stream
private void setDataInput(DataInput in) {
* Initialize reusable objects for writing into given DataOutput.
* @param out Output stream
private void setDataOutput(DataOutput out) {
* Get or create reusable serializer for given class.
* @param type Type of the object
* @return Serializer
private ReusableFieldSerializer<Object> getOrCreateReusableSerializer(
Class<?> type) {
ReusableFieldSerializer<Object> serializer =
if (serializer == null) {
serializer = new ReusableFieldSerializer<>(this, type);
classToIntoSerializer.put(type, serializer);
return serializer;
* Internal write implementation, that reuses HadoopKryo objects
* from the pool.
* @param out Output stream
* @param object Object to be written
* @param outOf whether we are writing reusable objects,
* or full objects with class name
private static void writeInternal(
final DataOutput out, final Object object, final boolean outOf) { KryoCallback<Void>() {
public Void execute(Kryo kryo) {
HadoopKryo hkryo = (HadoopKryo) kryo;
if (outOf) {
hkryo.writeOutOfObject(hkryo.output, object);
} else {
hkryo.writeClassAndObject(hkryo.output, object);
return null;
* Internal read implementation, that reuses HadoopKryo objects
* from the pool.
* @param in Input stream
* @param outObject Object to fill from input (if not null)
* @param into whether we are reading reusable objects,
* or full objects with class name
* @return Read object (new one, or same passed in if we use reusable)
* @param <T> Type of the object to read
private static <T> T readInternal(
final DataInput in, final T outObject, final boolean into) {
return KryoCallback<T>() {
public T execute(Kryo kryo) {
HadoopKryo hkryo = (HadoopKryo) kryo;
T object;
if (into) {
hkryo.readIntoObject(hkryo.input, outObject);
object = outObject;
} else {
object = (T) hkryo.readClassAndObject(hkryo.input);
return object;
* Reads an object, from input, into a given object,
* allowing object reuse.
* @param input Input stream
* @param object Object to fill from input
private void readIntoObject(Input input, Object object) {
Class<?> type = object.getClass();
ReusableFieldSerializer<Object> serializer =
Object result = readObject(input, type, serializer);
Preconditions.checkState(result == object);
* Write an object to output, in a way that can be read
* using readIntoObject.
* @param output Output stream
* @param object Object to be written
private void writeOutOfObject(Output output, Object object) {
ReusableFieldSerializer<Object> serializer =
writeObject(output, object, serializer);