blob: 90c0f25cf42825f96b616c4464bed7179af54382 [file] [log] [blame]
package org.apache.samoa.utils;
/*
* #%L
* SAMOA
* %%
* Copyright (C) 2014 - 2015 Apache Software Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.io.ByteArrayOutputStream;
import org.apache.samza.config.Config;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Implementation of Samza's SerdeFactory that uses Kryo to serialize/deserialize objects
*
* @author Anh Thu Vu
* @param <T>
*
*/
public class SamzaKryoSerdeFactory<T> implements SerdeFactory<T> {
private static final Logger logger = LoggerFactory.getLogger(SamzaKryoSerdeFactory.class);
public static class SamzaKryoSerde<V> implements Serde<V> {
private Kryo kryo;
public SamzaKryoSerde(String registrationInfo) {
this.kryo = new Kryo();
this.register(registrationInfo);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void register(String registrationInfo) {
if (registrationInfo == null)
return;
String[] infoList = registrationInfo.split(SamzaConfigFactory.COMMA);
Class targetClass = null;
Class serializerClass = null;
Serializer serializer = null;
for (String info : infoList) {
String[] fields = info.split(SamzaConfigFactory.COLON);
targetClass = null;
serializerClass = null;
if (fields.length > 0) {
try {
targetClass = Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK,
SamzaConfigFactory.DOLLAR_SIGN));
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (fields.length > 1) {
try {
serializerClass = Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK,
SamzaConfigFactory.DOLLAR_SIGN));
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (targetClass != null) {
if (serializerClass == null) {
kryo.register(targetClass);
}
else {
serializer = resolveSerializerInstance(kryo, targetClass, (Class<? extends Serializer>) serializerClass);
kryo.register(targetClass, serializer);
}
}
else {
logger.info("Invalid registration info:{}", info);
}
}
}
@SuppressWarnings("rawtypes")
private static Serializer resolveSerializerInstance(Kryo k, Class superClass,
Class<? extends Serializer> serializerClass) {
try {
try {
return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass);
} catch (Exception ex1) {
try {
return serializerClass.getConstructor(Kryo.class).newInstance(k);
} catch (Exception ex2) {
try {
return serializerClass.getConstructor(Class.class).newInstance(superClass);
} catch (Exception ex3) {
return serializerClass.newInstance();
}
}
}
} catch (Exception ex) {
throw new IllegalArgumentException("Unable to create serializer \""
+ serializerClass.getName()
+ "\" for class: "
+ superClass.getName(), ex);
}
}
/*
* Implement Samza Serde interface
*/
@Override
public byte[] toBytes(V obj) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Output output = new Output(bos);
kryo.writeClassAndObject(output, obj);
output.flush();
output.close();
return bos.toByteArray();
}
@SuppressWarnings("unchecked")
@Override
public V fromBytes(byte[] byteArr) {
Input input = new Input(byteArr);
Object obj = kryo.readClassAndObject(input);
input.close();
return (V) obj;
}
}
@Override
public Serde<T> getSerde(String name, Config config) {
return new SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY));
}
}