blob: d0e46dad48182ed802ea5ba0f025a4f80279098c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.serializer.KryoRegistrator;
import org.apache.spark.util.collection.CompactBuffer;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ImmutablePath;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.MutablePath;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter;
import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoSerializersV1d0;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoVersion;
import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.shaded.ShadedSerializerAdapter;
import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import org.apache.tinkerpop.gremlin.util.SystemUtil;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.mutable.WrappedArray;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
/**
* A spark.kryo.registrator implementation that installs TinkerPop types.
* This is intended for use with spark.serializer=KryoSerializer, not GryoSerializer.
*/
public class GryoRegistrator implements KryoRegistrator {
private static final Logger log = LoggerFactory.getLogger(GryoRegistrator.class);
@Override
public void registerClasses(final Kryo kryo) {
registerClasses(kryo, Collections.emptyMap(), Collections.emptySet());
}
/**
* Register TinkerPop's classes with the supplied {@link Kryo} instance
* while honoring optional overrides and optional class blacklist ("blackset"?).
*
* @param kryo the Kryo serializer instance with which to register types
* @param serializerOverrides serializer mappings that override this class's defaults
* @param blacklist classes which should not be registered at all, even if there is an override entry
* or if they would be registered by this class by default (does not affect Kryo's
* built-in registrations, e.g. String.class).
*/
public void registerClasses(final Kryo kryo, final Map<Class<?>, Serializer<?>> serializerOverrides, final Set<Class<?>> blacklist) {
// Apply TinkerPop type registrations copied from GyroSerializer's constructor
for (Map.Entry<Class<?>, Serializer<?>> ent : getExtraRegistrations().entrySet()) {
final Class<?> targetClass = ent.getKey();
final Serializer<?> ser = ent.getValue();
// Is this class blacklisted? Skip it. (takes precedence over serializerOverrides)
if (blacklist.contains(targetClass)) {
log.debug("Not registering serializer for {} (blacklisted)", targetClass);
continue;
}
if (checkForAndApplySerializerOverride(serializerOverrides, kryo, targetClass)) {
// do nothing but skip the remaining else(-if) clauses
} else if (null == ser) {
log.debug("Registering {} with default serializer", targetClass);
kryo.register(targetClass);
} else {
log.debug("Registering {} with serializer {}", targetClass, ser);
kryo.register(targetClass, ser);
}
}
final Set<Class<?>> shimmedClassesFromGryoMapper = new HashSet<>();
// Apply GryoMapper's default registrations
for (TypeRegistration<?> tr : GryoMapper.build().version(GryoVersion.V1_0).create().getTypeRegistrations()) {
// Is this class blacklisted? Skip it. (takes precedence over serializerOverrides)
if (blacklist.contains(tr.getTargetClass())) {
log.debug("Not registering serializer for {} (blacklisted)", tr.getTargetClass());
continue;
}
final org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = tr.getShadedSerializer();
final SerializerShim<?> serializerShim = tr.getSerializerShim();
final java.util.function.Function<
org.apache.tinkerpop.shaded.kryo.Kryo,
org.apache.tinkerpop.shaded.kryo.Serializer> functionOfShadedKryo = tr.getFunctionOfShadedKryo();
// Apply overrides with the highest case-precedence
if (checkForAndApplySerializerOverride(serializerOverrides, kryo, tr.getTargetClass())) {
// do nothing but skip the remaining else(-if) clauses
} else if (null != shadedSerializer) {
if (shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class)) {
// Convert GryoMapper's shaded JavaSerializer mappings to their unshaded equivalents
log.debug("Registering {} with JavaSerializer", tr.getTargetClass());
kryo.register(tr.getTargetClass(), new JavaSerializer());
} else {
// There's supposed to be a check in GryoMapper that prevents this from happening
log.error("GryoMapper's default serialization registration for {} is a {}. " +
"This is probably a bug in TinkerPop (this is not a valid default registration). " +
"I am configuring Spark to use Kryo's default serializer for this class, " +
"but this may cause serialization failures at runtime.",
tr.getTargetClass(),
org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
kryo.register(tr.getTargetClass());
}
} else if (null != serializerShim) {
// Wrap shim serializers in an adapter for Spark's unshaded Kryo
log.debug("Registering {} to serializer shim {} (serializer shim {})",
tr.getTargetClass(), serializerShim, serializerShim.getClass());
kryo.register(tr.getTargetClass(), new UnshadedSerializerAdapter<>(serializerShim));
shimmedClassesFromGryoMapper.add(tr.getTargetClass());
} else if (null != functionOfShadedKryo) {
// As with shaded serializers, there's supposed to be a check in GryoMapper that prevents this from happening
log.error("GryoMapper's default serialization registration for {} is a Function<{},{}>. " +
"This is probably a bug in TinkerPop (this is not a valid default registration). " +
"I am configuring Spark to use Kryo's default serializer instead of this function, " +
"but this may cause serialization failures at runtime.",
tr.getTargetClass(),
org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(),
org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName());
kryo.register(tr.getTargetClass());
} else {
// Register all other classes with the default behavior (FieldSerializer)
log.debug("Registering {} with default serializer", tr.getTargetClass());
kryo.register(tr.getTargetClass());
}
}
// StarGraph's shim serializer is especially important on Spark for efficiency reasons,
// so log a warning if we failed to register it somehow
if (!shimmedClassesFromGryoMapper.contains(StarGraph.class)) {
log.warn("No SerializerShim found for StarGraph");
}
// handle io-registry classes
for (final IoRegistry registry : IoRegistryHelper.createRegistries(SystemUtil.getSystemPropertiesConfiguration("tinkerpop", true))) {
for (final Pair<Class, Object> pair : registry.find(GryoIo.class)) {
if (pair.getValue1() instanceof SerializerShim)
kryo.register(pair.getValue0(), new UnshadedSerializerAdapter((SerializerShim) pair.getValue1()));
else if (pair.getValue1() instanceof ShadedSerializerAdapter)
kryo.register(pair.getValue0(), new UnshadedSerializerAdapter(((ShadedSerializerAdapter) pair.getValue1()).getSerializerShim()));
else
kryo.register(pair.getValue0(), kryo.getDefaultSerializer(pair.getValue0()));
}
}
}
private LinkedHashMap<Class<?>, Serializer<?>> getExtraRegistrations() {
/* The map returned by this method MUST have a fixed iteration order!
*
* The order itself is irrelevant, so long as it is completely stable at runtime.
*
* LinkedHashMap satisfies this requirement (its contract specifies
* iteration in key-insertion-order).
*/
final LinkedHashMap<Class<?>, Serializer<?>> m = new LinkedHashMap<>();
// The following entries were copied from GryoSerializer's constructor
// This could be turned into a static collection on GryoSerializer to avoid
// duplication, but it would be a bit cumbersome to do so without disturbing
// the ordering of the existing entries in that constructor, since not all
// of the entries are for TinkerPop (and the ordering is significant).
try {
m.put(Class.forName("scala.reflect.ManifestFactory$AnyManifest"), new JavaSerializer());
m.put(Class.forName("scala.reflect.ClassTag$GenericClassTag"), new JavaSerializer());
m.put(Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"), new JavaSerializer());
m.put(Class.forName("org.apache.spark.internal.io.FileCommitProtocol$EmptyTaskCommitMessage$"), new JavaSerializer());
} catch (final ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
m.put(WrappedArray.ofRef.class, null);
m.put(MessagePayload.class, null);
m.put(ViewIncomingPayload.class, null);
m.put(ViewOutgoingPayload.class, null);
m.put(ViewPayload.class, null);
m.put(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer()));
m.put(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>()));
//
m.put(HadoopVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexSerializer()));
m.put(HadoopVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexPropertySerializer()));
m.put(HadoopProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PropertySerializer()));
m.put(HadoopEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.EdgeSerializer()));
//
m.put(ComputerGraph.ComputerVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexSerializer()));
m.put(ComputerGraph.ComputerVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexPropertySerializer()));
m.put(ComputerGraph.ComputerProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PropertySerializer()));
m.put(ComputerGraph.ComputerEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.EdgeSerializer()));
//
m.put(StarGraph.StarEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.EdgeSerializer()));
m.put(StarGraph.StarVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexSerializer()));
m.put(StarGraph.StarProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PropertySerializer()));
m.put(StarGraph.StarVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexPropertySerializer()));
//
m.put(MutablePath.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PathSerializer()));
m.put(ImmutablePath.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PathSerializer()));
//
m.put(CompactBuffer[].class, null);
// TODO: VoidSerializer is a default serializer and thus, may not be needed (if it is, you can't use FieldSerializer)
// TODO: We will have to copy/paste the shaded DefaultSerializer.VoidSerializer into an unshaded form.
//m.put(void.class, null);
//m.put(Void.class, null);
return m;
}
private boolean checkForAndApplySerializerOverride(final Map<Class<?>, Serializer<?>> serializerOverrides,
final Kryo kryo, Class<?> targetClass) {
if (serializerOverrides.containsKey(targetClass)) {
final Serializer<?> ser = serializerOverrides.get(targetClass);
if (null == ser) {
// null means use Kryo's default serializer
log.debug("Registering {} with default serializer per overrides", targetClass);
kryo.register(targetClass);
} else {
// nonnull means use that serializer
log.debug("Registering {} with serializer {} per overrides", targetClass, ser);
kryo.register(targetClass, ser);
}
return true;
}
return false;
}
}