Set to Kryo serializer when registerKryoSerialization is called (#3283)
* Set to Kryo serializer when registerKryoSerialization is called. This is to make the behavior more consistent to Storm API (which uses Kryo to serialize objects all the time)
diff --git a/heron/api/src/java/org/apache/heron/api/Config.java b/heron/api/src/java/org/apache/heron/api/Config.java
index 075ebb5..bc7ca17 100644
--- a/heron/api/src/java/org/apache/heron/api/Config.java
+++ b/heron/api/src/java/org/apache/heron/api/Config.java
@@ -110,6 +110,15 @@
* The serialization class that is used to serialize/deserialize tuples
*/
public static final String TOPOLOGY_SERIALIZER_CLASSNAME = "topology.serializer.classname";
+
+ /**
+ * The serializers available for TOPOLOGY_SERIALIZER_CLASSNAME.
+ */
+ public static final String HERON_JAVA_SERIALIZER_CLASS_NAME =
+ "org.apache.heron.api.serializer.JavaSerializer";
+ public static final String HERON_KRYO_SERIALIZER_CLASS_NAME =
+ "org.apache.heron.api.serializer.KryoSerializer";
+
/**
* Class that specifies how to create a Kryo instance for serialization. Heron will then apply
* topology.kryo.register. The default implementation
@@ -480,11 +489,19 @@
}
public static void registerKryoSerialization(Map<String, Object> conf, Class klass) {
+ // Set to Kryo Seralizer when this function is called. In Storm API, Kryo seralizer is
+ // the default but it is not in Heron. This implicit set could make migration easier
+ // since it is more consistent.
+ setSerializationClassName(conf, HERON_KRYO_SERIALIZER_CLASS_NAME);
getRegisteredKryoSerializations(conf).add(klass.getName());
}
public static void registerKryoSerialization(
Map<String, Object> conf, Class klass, Class<? extends Serializer> serializerClass) {
+ // Set to Kryo Seralizer when this function is called. In Storm API, Kryo seralizer is
+ // the default but it is not in Heron. This implicit set could make migration easier
+ // since it is more consistent.
+ setSerializationClassName(conf, HERON_KRYO_SERIALIZER_CLASS_NAME);
Map<String, String> register = new HashMap<>();
register.put(klass.getName(), serializerClass.getName());
getRegisteredKryoSerializations(conf).add(register);