blob: 9f9f1442a9e061f2cb4ee97e366088830687058f [file] [log] [blame]
package org.apache.samoa.topology.impl;
/*
* #%L
* SAMOA
* %%
* Copyright (C) 2014 - 2015 Apache Software Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.nio.ByteBuffer;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.samoa.learners.classifiers.trees.AttributeContentEvent;
import org.apache.samoa.learners.classifiers.trees.ComputeContentEvent;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class SamoaSerializer implements SerializerDeserializer {
private ThreadLocal<Kryo> kryoThreadLocal;
private ThreadLocal<Output> outputThreadLocal;
private int initialBufferSize = 2048;
private int maxBufferSize = 256 * 1024;
public void setMaxBufferSize(int maxBufferSize) {
this.maxBufferSize = maxBufferSize;
}
/**
*
* @param classLoader
* classloader able to handle classes to serialize/deserialize. For instance, application-level events can
* only be handled by the application classloader.
*/
@Inject
public SamoaSerializer(@Assisted final ClassLoader classLoader) {
kryoThreadLocal = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.setClassLoader(classLoader);
kryo.register(AttributeContentEvent.class, new AttributeContentEvent.AttributeCEFullPrecSerializer());
kryo.register(ComputeContentEvent.class, new ComputeContentEvent.ComputeCEFullPrecSerializer());
kryo.setRegistrationRequired(false);
return kryo;
}
};
outputThreadLocal = new ThreadLocal<Output>() {
@Override
protected Output initialValue() {
Output output = new Output(initialBufferSize, maxBufferSize);
return output;
}
};
}
@Override
public Object deserialize(ByteBuffer rawMessage) {
Input input = new Input(rawMessage.array());
try {
return kryoThreadLocal.get().readClassAndObject(input);
} finally {
input.close();
}
}
@SuppressWarnings("resource")
@Override
public ByteBuffer serialize(Object message) {
Output output = outputThreadLocal.get();
try {
kryoThreadLocal.get().writeClassAndObject(output, message);
return ByteBuffer.wrap(output.toBytes());
} finally {
output.clear();
}
}
}