blob: 9e267767fe23039becc55bf72faa5d40dbe6b96e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.stellar.common.utils;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.Util;
import com.esotericsoftware.reflectasm.ConstructorAccess;
import de.javakaffee.kryoserializers.*;
import de.javakaffee.kryoserializers.cglib.CGLibProxySerializer;
import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer;
import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer;
import java.io.ByteArrayInputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Modifier;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.function.Function;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.objenesis.instantiator.ObjectInstantiator;
import org.objenesis.strategy.InstantiatorStrategy;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.esotericsoftware.kryo.util.Util.className;
/**
* Provides basic functionality to serialize and deserialize the allowed
* value types for a ProfileMeasurement.
*/
public class SerDeUtils {
protected static final Logger LOG = LoggerFactory.getLogger(SerDeUtils.class);
private static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
Kryo ret = new Kryo();
ret.setReferences(true);
ret.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
ret.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
ret.register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer());
ret.register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer());
ret.register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer());
ret.register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer());
ret.register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer());
ret.register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer());
ret.register(GregorianCalendar.class, new GregorianCalendarSerializer());
ret.register(InvocationHandler.class, new JdkProxySerializer());
UnmodifiableCollectionsSerializer.registerSerializers(ret);
SynchronizedCollectionsSerializer.registerSerializers(ret);
// custom serializers for non-jdk libs
// register CGLibProxySerializer, works in combination with the appropriate action in handleUnregisteredClass (see below)
ret.register(CGLibProxySerializer.CGLibProxyMarker.class, new CGLibProxySerializer());
// joda DateTime, LocalDate and LocalDateTime
ret.register(LocalDate.class, new JodaLocalDateSerializer());
ret.register(LocalDateTime.class, new JodaLocalDateTimeSerializer());
// guava ImmutableList, ImmutableSet, ImmutableMap, ImmutableMultimap, UnmodifiableNavigableSet
ImmutableListSerializer.registerSerializers(ret);
ImmutableSetSerializer.registerSerializers(ret);
ImmutableMapSerializer.registerSerializers(ret);
ImmutableMultimapSerializer.registerSerializers(ret);
return ret;
}
};
/**
* This was backported from a more recent version of kryo than we currently run. The reason why it exists is
* that we want a strategy for instantiation of classes which attempts a no-arg constructor first and THEN falls
* back to reflection for performance reasons alone (this is, after all, in the critical path).
*
*/
static private class DefaultInstantiatorStrategy implements org.objenesis.strategy.InstantiatorStrategy {
private InstantiatorStrategy fallbackStrategy;
public DefaultInstantiatorStrategy () {
}
public DefaultInstantiatorStrategy (InstantiatorStrategy fallbackStrategy) {
this.fallbackStrategy = fallbackStrategy;
}
public void setFallbackInstantiatorStrategy (final InstantiatorStrategy fallbackStrategy) {
this.fallbackStrategy = fallbackStrategy;
}
public InstantiatorStrategy getFallbackInstantiatorStrategy () {
return fallbackStrategy;
}
@Override
public ObjectInstantiator newInstantiatorOf (final Class type) {
if (!Util.isAndroid) {
// Use ReflectASM if the class is not a non-static member class.
Class enclosingType = type.getEnclosingClass();
boolean isNonStaticMemberClass = enclosingType != null && type.isMemberClass()
&& !Modifier.isStatic(type.getModifiers());
if (!isNonStaticMemberClass) {
try {
final ConstructorAccess access = ConstructorAccess.get(type);
return new ObjectInstantiator() {
@Override
public Object newInstance () {
try {
return access.newInstance();
} catch (Exception ex) {
throw new KryoException("Error constructing instance of class: " + className(type), ex);
}
}
};
} catch (Exception ignored) {
}
}
}
// Reflection.
try {
Constructor ctor;
try {
ctor = type.getConstructor((Class[])null);
} catch (Exception ex) {
ctor = type.getDeclaredConstructor((Class[])null);
ctor.setAccessible(true);
}
final Constructor constructor = ctor;
return new ObjectInstantiator() {
@Override
public Object newInstance () {
try {
return constructor.newInstance();
} catch (Exception ex) {
throw new KryoException("Error constructing instance of class: " + className(type), ex);
}
}
};
} catch (Exception ignored) {
}
if (fallbackStrategy == null) {
if (type.isMemberClass() && !Modifier.isStatic(type.getModifiers()))
throw new KryoException("Class cannot be created (non-static member class): " + className(type));
else
throw new KryoException("Class cannot be created (missing no-arg constructor): " + className(type));
}
// InstantiatorStrategy.
return fallbackStrategy.newInstantiatorOf(type);
}
}
public static Serializer SERIALIZER = new Serializer();
private static class Serializer implements Function<Object, byte[]> {
/**
* Serializes the given Object into bytes.
*
*/
@Override
public byte[] apply(Object o) {
return toBytes(o);
}
}
public static class Deserializer<T> implements Function<byte[], T> {
private Class<T> clazz;
public Deserializer(Class<T> clazz) {
this.clazz = clazz;
}
/**
* Deserializes the given bytes.
*
* @param bytes the function argument
* @return the function result
*/
@Override
public T apply(byte[] bytes) {
return fromBytes(bytes, clazz);
}
}
private SerDeUtils() {
// do not instantiate
}
/**
* Serialize a profile measurement's value.
*
* The value produced by a Profile definition can be any numeric data type. The data
* type depends on how the profile is defined by the user. The user should be able to
* choose the data type that is most suitable for their use case.
*
* @param value The value to serialize.
*/
public static byte[] toBytes(Object value) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Output output = new Output(bos);
kryo.get().writeClassAndObject(output, value);
output.flush();
bos.flush();
return bos.toByteArray();
}
catch(Throwable t) {
LOG.error("Unable to serialize: " + value + " because " + t.getMessage(), t);
throw new IllegalStateException("Unable to serialize " + value + " because " + t.getMessage(), t);
}
}
/**
* Deserialize a profile measurement's value.
*
* The value produced by a Profile definition can be any numeric data type. The data
* type depends on how the profile is defined by the user. The user should be able to
* choose the data type that is most suitable for their use case.
*
* @param value The value to deserialize.
*/
public static <T> T fromBytes(byte[] value, Class<T> clazz) {
try {
Input input = new Input(new ByteArrayInputStream(value));
return clazz.cast(kryo.get().readClassAndObject(input));
}
catch(Throwable t) {
LOG.error("Unable to deserialize because " + t.getMessage(), t);
throw t;
}
}
}