blob: 61086cd89e88532f34a9cee1564e4f25cbde8159 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.structure.io.gryo;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
import org.apache.tinkerpop.gremlin.structure.io.Mapper;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.shaded.ShadedSerializerAdapter;
import org.apache.tinkerpop.shaded.kryo.ClassResolver;
import org.apache.tinkerpop.shaded.kryo.Kryo;
import org.apache.tinkerpop.shaded.kryo.Serializer;
import org.apache.tinkerpop.shaded.kryo.util.DefaultStreamFactory;
import org.apache.tinkerpop.shaded.kryo.util.MapReferenceResolver;
import org.javatuples.Pair;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* A {@link Mapper} implementation for Kryo. This implementation requires that all classes to be serialized by
* Kryo are registered to it.
* <p/>
* {@link Graph} implementations providing an {@link IoRegistry} should register their custom classes and/or
* serializers in one of three ways:
* <p/>
* <ol>
* <li>Register just the custom class with a {@code null} {@link Serializer} implementation</li>
* <li>Register the custom class with a {@link Serializer} implementation</li>
* <li>
* Register the custom class with a {@code Function<Kryo, Serializer>} for those cases where the
* {@link Serializer} requires the {@link Kryo} instance to get constructed.
* </li>
* </ol>
* <p/>
* For example:
* <pre>
* {@code
* public class MyGraphIoRegistry extends AbstractIoRegistry {
* public MyGraphIoRegistry() {
* register(GryoIo.class, MyGraphIdClass.class, new MyGraphIdSerializer());
* }
* }
* }
* </pre>
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public final class GryoMapper implements Mapper<Kryo> {
public static final byte[] GIO = "gio".getBytes();
public static final byte[] HEADER = Arrays.copyOf(GIO, 16);
private final List<TypeRegistration<?>> typeRegistrations;
private final boolean registrationRequired;
private final boolean referenceTracking;
private final Supplier<ClassResolver> classResolver;
private final GryoVersion version;
private GryoMapper(final Builder builder) {
this.typeRegistrations = builder.typeRegistrations;
this.version = builder.version;
validate();
this.registrationRequired = builder.registrationRequired;
this.referenceTracking = builder.referenceTracking;
this.classResolver = null == builder.classResolver ? version.getClassResolverMaker() : builder.classResolver;
}
@Override
public Kryo createMapper() {
final Kryo kryo = new Kryo(classResolver.get(), new MapReferenceResolver(), new DefaultStreamFactory());
kryo.addDefaultSerializer(Map.Entry.class, new UtilSerializers.EntrySerializer());
kryo.setRegistrationRequired(registrationRequired);
kryo.setReferences(referenceTracking);
for (TypeRegistration tr : typeRegistrations)
tr.registerWith(kryo);
return kryo;
}
public GryoVersion getVersion() {
return version;
}
public List<Class> getRegisteredClasses() {
return this.typeRegistrations.stream().map(TypeRegistration::getTargetClass).collect(Collectors.toList());
}
public List<TypeRegistration<?>> getTypeRegistrations() {
return typeRegistrations;
}
public static Builder build() {
return new Builder();
}
private void validate() {
final Set<Integer> duplicates = new HashSet<>();
final Set<Integer> ids = new HashSet<>();
typeRegistrations.forEach(t -> {
if (!ids.contains(t.getId()))
ids.add(t.getId());
else
duplicates.add(t.getId());
});
if (duplicates.size() > 0)
throw new IllegalStateException("There are duplicate kryo identifiers in use: " + duplicates);
}
/**
* A builder to construct a {@link GryoMapper} instance.
*/
public static class Builder implements Mapper.Builder<Builder> {
private GryoVersion version = GryoVersion.V3_0;
/**
* Note that the following are pre-registered boolean, Boolean, byte, Byte, char, Character, double, Double,
* int, Integer, float, Float, long, Long, short, Short, String, void.
*/
private List<TypeRegistration<?>> typeRegistrations = version.cloneRegistrations();
private final List<IoRegistry> registries = new ArrayList<>();
/**
* Starts numbering classes for Gryo serialization at 65536 to leave room for future usage by TinkerPop.
*/
private final AtomicInteger currentSerializationId = new AtomicInteger(65536);
private boolean registrationRequired = true;
private boolean referenceTracking = true;
private Supplier<ClassResolver> classResolver;
private Builder() {
}
/**
* {@inheritDoc}
*/
@Override
public Builder addRegistry(final IoRegistry registry) {
if (null == registry) throw new IllegalArgumentException("The registry cannot be null");
this.registries.add(registry);
return this;
}
/**
* The version of Gryo to use in the mapper. Defaults to 1.0. Calls to this method will reset values specified
* to {@link #addCustom(Class, Function)} and related overloads.
*/
public Builder version(final GryoVersion version) {
this.version = version;
this.typeRegistrations = version.cloneRegistrations();
return this;
}
/**
* Provides a custom Kryo {@code ClassResolver} to be supplied to a {@code Kryo} instance. If this value is
* not supplied then it will default to the {@code ClassResolver} of the provided {@link GryoVersion}. To
* ensure compatibility with Gryo it is highly recommended that objects passed to this method extend that class.
* <p/>
* If the {@code ClassResolver} implementation share state, then the {@link Supplier} should typically create
* new instances when requested, as the {@link Supplier} will be called for each {@link Kryo} instance created.
*/
public Builder classResolver(final Supplier<ClassResolver> classResolverSupplier) {
if (null == classResolverSupplier)
throw new IllegalArgumentException("The classResolverSupplier cannot be null");
this.classResolver = classResolverSupplier;
return this;
}
/**
* Register custom classes to serializes with gryo using default serialization. Note that calling this method
* for a class that is already registered will override that registration.
*/
public Builder addCustom(final Class... custom) {
if (custom != null && custom.length > 0) {
for (Class c : custom) {
addOrOverrideRegistration(c, id -> GryoTypeReg.of(c, id));
}
}
return this;
}
/**
* Register custom class to serialize with a custom serialization class. Note that calling this method for
* a class that is already registered will override that registration.
*/
public Builder addCustom(final Class clazz, final Serializer serializer) {
addOrOverrideRegistration(clazz, id -> GryoTypeReg.of(clazz, id, serializer));
return this;
}
/**
* Register custom class to serialize with a custom serialization shim.
*/
public Builder addCustom(final Class clazz, final SerializerShim serializer) {
addOrOverrideRegistration(clazz, id -> GryoTypeReg.of(clazz, id, serializer));
return this;
}
/**
* Register a custom class to serialize with a custom serializer as returned from a {@link Function}. Note
* that calling this method for a class that is already registered will override that registration.
*/
public Builder addCustom(final Class clazz, final Function<Kryo, Serializer> functionOfKryo) {
addOrOverrideRegistration(clazz, id -> GryoTypeReg.of(clazz, id, functionOfKryo));
return this;
}
/**
* When set to {@code true}, all classes serialized by the {@code Kryo} instances created from this
* {@link GryoMapper} must have their classes known up front and registered appropriately through this
* builder. By default this value is {@code true}. This approach is more efficient than setting the
* value to {@code false}.
*
* @param registrationRequired set to {@code true} if the classes should be registered up front or
* {@code false} otherwise
*/
public Builder registrationRequired(final boolean registrationRequired) {
this.registrationRequired = registrationRequired;
return this;
}
/**
* By default, each appearance of an object in the graph after the first is stored as an integer ordinal.
* This allows multiple references to the same object and cyclic graphs to be serialized. This has a small
* amount of overhead and can be disabled to save space if it is not needed.
*
* @param referenceTracking set to {@code true} to enable and {@code false} otherwise
*/
public Builder referenceTracking(final boolean referenceTracking) {
this.referenceTracking = referenceTracking;
return this;
}
/**
* Creates a {@code GryoMapper}.
*/
public GryoMapper create() {
// consult the registry if provided and inject registry entries as custom classes.
registries.forEach(registry -> {
final List<Pair<Class, Object>> serializers = registry.find(GryoIo.class);
serializers.forEach(p -> {
if (null == p.getValue1())
addCustom(p.getValue0());
else if (p.getValue1() instanceof SerializerShim)
addCustom(p.getValue0(), new ShadedSerializerAdapter((SerializerShim) p.getValue1()));
else if (p.getValue1() instanceof Serializer)
addCustom(p.getValue0(), (Serializer) p.getValue1());
else if (p.getValue1() instanceof Function)
addCustom(p.getValue0(), (Function<Kryo, Serializer>) p.getValue1());
else
throw new IllegalStateException(String.format(
"Unexpected value provided by %s for serializable class %s - expected a parameter in [null, %s implementation or Function<%s, %s>], but received %s",
registry.getClass().getSimpleName(), p.getValue0().getClass().getCanonicalName(),
Serializer.class.getName(), Kryo.class.getSimpleName(),
Serializer.class.getSimpleName(), p.getValue1()));
});
});
return new GryoMapper(this);
}
private <T> void addOrOverrideRegistration(final Class<?> clazz,
final Function<Integer, TypeRegistration<T>> newRegistrationBuilder) {
final Iterator<TypeRegistration<?>> iter = typeRegistrations.iterator();
Integer registrationId = null;
while (iter.hasNext()) {
final TypeRegistration<?> existingRegistration = iter.next();
if (existingRegistration.getTargetClass().equals(clazz)) {
// when overridding a registration, use the old id
registrationId = existingRegistration.getId();
// remove the old registration (we install its override below)
iter.remove();
break;
}
}
if (null == registrationId) {
// when not overridding a registration, get an id from the counter
registrationId = currentSerializationId.getAndIncrement();
}
typeRegistrations.add(newRegistrationBuilder.apply(registrationId));
}
}
private static class GryoTypeReg<T> implements TypeRegistration<T> {
private final Class<T> clazz;
private final Serializer<T> shadedSerializer;
private final SerializerShim<T> serializerShim;
private final Function<Kryo, Serializer> functionOfShadedKryo;
private final int id;
private GryoTypeReg(final Class<T> clazz,
final Serializer<T> shadedSerializer,
final SerializerShim<T> serializerShim,
final Function<Kryo, Serializer> functionOfShadedKryo,
final int id) {
this.clazz = clazz;
this.shadedSerializer = shadedSerializer;
this.serializerShim = serializerShim;
this.functionOfShadedKryo = functionOfShadedKryo;
this.id = id;
int serializerCount = 0;
if (null != this.shadedSerializer)
serializerCount++;
if (null != this.serializerShim)
serializerCount++;
if (null != this.functionOfShadedKryo)
serializerCount++;
if (1 < serializerCount) {
final String msg = String.format(
"GryoTypeReg accepts at most one kind of serializer, but multiple " +
"serializers were supplied for class %s (id %s). " +
"Shaded serializer: %s. Shim serializer: %s. Shaded serializer function: %s.",
this.clazz.getCanonicalName(), id,
this.shadedSerializer, this.serializerShim, this.functionOfShadedKryo);
throw new IllegalArgumentException(msg);
}
}
private static <T> GryoTypeReg<T> of(final Class<T> clazz, final int id) {
return new GryoTypeReg<>(clazz, null, null, null, id);
}
private static <T> GryoTypeReg<T> of(final Class<T> clazz, final int id, final Serializer<T> shadedSerializer) {
return new GryoTypeReg<>(clazz, shadedSerializer, null, null, id);
}
private static <T> GryoTypeReg<T> of(final Class<T> clazz, final int id, final SerializerShim<T> serializerShim) {
return new GryoTypeReg<>(clazz, null, serializerShim, null, id);
}
private static <T> GryoTypeReg<T> of(final Class clazz, final int id, final Function<Kryo, Serializer> fct) {
return new GryoTypeReg<>(clazz, null, null, fct, id);
}
@Override
public Serializer<T> getShadedSerializer() {
return shadedSerializer;
}
@Override
public SerializerShim<T> getSerializerShim() {
return serializerShim;
}
@Override
public Function<Kryo, Serializer> getFunctionOfShadedKryo() {
return functionOfShadedKryo;
}
@Override
public Class<T> getTargetClass() {
return clazz;
}
@Override
public int getId() {
return id;
}
@Override
public Kryo registerWith(final Kryo kryo) {
if (null != functionOfShadedKryo)
kryo.register(clazz, functionOfShadedKryo.apply(kryo), id);
else if (null != shadedSerializer)
kryo.register(clazz, shadedSerializer, id);
else if (null != serializerShim)
kryo.register(clazz, new ShadedSerializerAdapter<>(serializerShim), id);
else {
kryo.register(clazz, kryo.getDefaultSerializer(clazz), id);
// Suprisingly, the preceding call is not equivalent to
// kryo.register(clazz, id);
}
return kryo;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("targetClass", clazz)
.append("id", id)
.append("shadedSerializer", shadedSerializer)
.append("serializerShim", serializerShim)
.append("functionOfShadedKryo", functionOfShadedKryo)
.toString();
}
}
}