blob: 7d1dc8d6691c78871ccb86cd49e5b55bbeb00d6e [file] [log] [blame]
package backtype.storm.serialization;
import backtype.storm.Config;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.StormTopology;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.utils.ListDelegate;
import backtype.storm.utils.Utils;
import carbonite.JavaBridge;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.ObjectBuffer;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.serialize.BigIntegerSerializer;
import com.esotericsoftware.kryo.serialize.SerializableSerializer;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.log4j.Logger;
public class SerializationFactory {
public static final Logger LOG = Logger.getLogger(SerializationFactory.class);
public static class KryoSerializableDefault extends Kryo {
boolean _override = false;
public void overrideDefault(boolean value) {
_override = value;
}
@Override
protected Serializer newDefaultSerializer(Class type) {
if(_override) {
return new SerializableSerializer();
} else {
return super.newDefaultSerializer(type);
}
}
}
public static ObjectBuffer getKryo(Map conf) {
KryoSerializableDefault k = new KryoSerializableDefault();
k.setRegistrationOptional((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION));
k.register(byte[].class);
k.register(ListDelegate.class);
k.register(ArrayList.class);
k.register(HashMap.class);
k.register(HashSet.class);
k.register(BigInteger.class, new BigIntegerSerializer());
k.register(TransactionAttempt.class);
JavaBridge clojureSerializersBridge = new JavaBridge();
clojureSerializersBridge.registerClojureCollections(k);
clojureSerializersBridge.registerClojurePrimitives(k);
Map<String, String> registrations = normalizeKryoRegister(conf);
boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);
for(String klassName: registrations.keySet()) {
String serializerClassName = registrations.get(klassName);
try {
Class klass = Class.forName(klassName);
Class serializerClass = null;
if(serializerClassName!=null)
serializerClass = Class.forName(serializerClassName);
if(serializerClass == null) {
k.register(klass);
} else {
k.register(klass, (Serializer) serializerClass.newInstance());
}
} catch (ClassNotFoundException e) {
if(skipMissing) {
LOG.info("Could not find serialization or class for " + serializerClassName + ". Skipping registration...");
} else {
throw new RuntimeException(e);
}
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
k.overrideDefault(true);
return new ObjectBuffer(k, 2000, 2000000000);
}
public static class IdDictionary {
Map<String, Map<String, Integer>> streamNametoId = new HashMap<String, Map<String, Integer>>();
Map<String, Map<Integer, String>> streamIdToName = new HashMap<String, Map<Integer, String>>();
public IdDictionary(StormTopology topology) {
List<String> componentNames = new ArrayList<String>(topology.get_spouts().keySet());
componentNames.addAll(topology.get_bolts().keySet());
componentNames.addAll(topology.get_state_spouts().keySet());
for(String name: componentNames) {
ComponentCommon common = Utils.getComponentCommon(topology, name);
List<String> streams = new ArrayList<String>(common.get_streams().keySet());
streamNametoId.put(name, idify(streams));
streamIdToName.put(name, Utils.reverseMap(streamNametoId.get(name)));
}
}
public int getStreamId(String component, String stream) {
return streamNametoId.get(component).get(stream);
}
public String getStreamName(String component, int stream) {
return streamIdToName.get(component).get(stream);
}
private static Map<String, Integer> idify(List<String> names) {
Collections.sort(names);
Map<String, Integer> ret = new HashMap<String, Integer>();
int i = 1;
for(String name: names) {
ret.put(name, i);
i++;
}
return ret;
}
}
private static Map<String, String> normalizeKryoRegister(Map conf) {
// TODO: de-duplicate this logic with the code in nimbus
Object res = conf.get(Config.TOPOLOGY_KRYO_REGISTER);
if(res==null) return new TreeMap<String, String>();
Map<String, String> ret = new HashMap<String, String>();
if(res instanceof Map) {
ret = (Map<String, String>) res;
} else {
for(Object o: (List) res) {
if(o instanceof Map) {
ret.putAll((Map) o);
} else {
ret.put((String) o, null);
}
}
}
//ensure always same order for registrations with TreeMap
return new TreeMap<String, String>(ret);
}
}