blob: cdbe35963efd83961fb0ee64785b7c429e4b294a [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.codec;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import javax.annotation.Nullable;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Data;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;
import static java.util.Objects.requireNonNull;
/**
* Codec that works for thrift objects.
*/
public final class ThriftBinaryCodec {
/**
* Protocol factory used for all thrift encoding and decoding.
*/
public static final TProtocolFactory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
private ThriftBinaryCodec() {
// Utility class.
}
/**
* Identical to {@link #decodeNonNull(Class, byte[])}, but allows for a null buffer.
*
* @param clazz Class to instantiate and deserialize to.
* @param buffer Buffer to decode.
* @param <T> Target type.
* @return A populated message, or {@code null} if the buffer was {@code null}.
* @throws CodingException If the message could not be decoded.
*/
@Nullable
public static <T extends TBase<T, ?>> T decode(Class<T> clazz, @Nullable byte[] buffer)
throws CodingException {
if (buffer == null) {
return null;
}
return decodeNonNull(clazz, buffer);
}
/**
* Decodes a binary-encoded byte array into a target type.
*
* @param clazz Class to instantiate and deserialize to.
* @param buffer Buffer to decode.
* @param <T> Target type.
* @return A populated message.
* @throws CodingException If the message could not be decoded.
*/
public static <T extends TBase<T, ?>> T decodeNonNull(Class<T> clazz, byte[] buffer)
throws CodingException {
requireNonNull(clazz);
requireNonNull(buffer);
try {
T t = newInstance(clazz);
new TDeserializer(PROTOCOL_FACTORY).deserialize(t, buffer);
return t;
} catch (TException e) {
throw new CodingException("Failed to deserialize thrift object.", e);
}
}
/**
* Identical to {@link #encodeNonNull(TBase)}, but allows for a null input.
*
* @param tBase Object to encode.
* @return Encoded object, or {@code null} if the argument was {@code null}.
* @throws CodingException If the object could not be encoded.
*/
@Nullable
public static byte[] encode(@Nullable TBase<?, ?> tBase) throws CodingException {
if (tBase == null) {
return null;
}
return encodeNonNull(tBase);
}
/**
* Encodes a thrift object into a binary array.
*
* @param tBase Object to encode.
* @return Encoded object.
* @throws CodingException If the object could not be encoded.
*/
public static byte[] encodeNonNull(TBase<?, ?> tBase) throws CodingException {
requireNonNull(tBase);
try {
return new TSerializer(PROTOCOL_FACTORY).serialize(tBase);
} catch (TException e) {
throw new CodingException("Failed to serialize: " + tBase, e);
}
}
// See http://www.zlib.net/zlib_how.html
// "If the memory is available, buffers sizes on the order of 128K or 256K bytes should be used."
private static final int DEFLATER_BUFFER_SIZE = Amount.of(256, Data.KB).as(Data.BYTES);
// Empirical from microbenchmarks (assuming 20MiB/s writes to the replicated log and a large
// de-duplicated Snapshot from a production environment).
// TODO(ksweeney): Consider making this configurable.
private static final int DEFLATE_LEVEL = 3;
/**
* Encodes a thrift object into a DEFLATE-compressed binary array.
*
* @param tBase Object to encode.
* @return Deflated, encoded object.
* @throws CodingException If the object could not be encoded.
*/
public static byte[] deflateNonNull(TBase<?, ?> tBase) throws CodingException {
requireNonNull(tBase);
// NOTE: Buffering is needed here for performance.
// There are actually 2 buffers in play here - the BufferedOutputStream prevents thrift from
// causing a call to deflate() on every encoded primitive. The DeflaterOutputStream buffer
// allows the underlying Deflater to operate on a larger chunk at a time without stopping to
// copy the intermediate compressed output to outBytes.
// See http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4986239
ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
TTransport transport = new TIOStreamTransport(
new BufferedOutputStream(
new DeflaterOutputStream(outBytes, new Deflater(DEFLATE_LEVEL), DEFLATER_BUFFER_SIZE),
DEFLATER_BUFFER_SIZE));
try {
TProtocol protocol = PROTOCOL_FACTORY.getProtocol(transport);
tBase.write(protocol);
transport.close(); // calls finish() on the underlying stream, completing the compression
return outBytes.toByteArray();
} catch (TException e) {
throw new CodingException("Failed to serialize: " + tBase, e);
} finally {
transport.close();
}
}
/**
* Decodes a thrift object from a DEFLATE-compressed byte array into a target type.
*
* @param clazz Class to instantiate and deserialize to.
* @param buffer Compressed buffer to decode.
* @return A populated message.
* @throws CodingException If the message could not be decoded.
*/
public static <T extends TBase<T, ?>> T inflateNonNull(Class<T> clazz, byte[] buffer)
throws CodingException {
requireNonNull(clazz);
requireNonNull(buffer);
T tBase = newInstance(clazz);
TTransport transport = new TIOStreamTransport(
new InflaterInputStream(new ByteArrayInputStream(buffer)));
try {
TProtocol protocol = PROTOCOL_FACTORY.getProtocol(transport);
tBase.read(protocol);
return tBase;
} catch (TException e) {
throw new CodingException("Failed to deserialize: " + e, e);
} finally {
transport.close();
}
}
private static <T extends TBase<T, ?>> T newInstance(Class<T> clazz) throws CodingException {
try {
return clazz.getConstructor().newInstance();
} catch (InvocationTargetException e) {
throw new CodingException("Exception in constructor for target type: " + e, e);
} catch (NoSuchMethodException e) {
throw new CodingException(
"No no-args constructor for target type: "
+ clazz
+ ". Did the thrift code generator change?",
e);
} catch (InstantiationException e) {
throw new CodingException("Failed to instantiate target type.", e);
} catch (IllegalAccessException e) {
throw new CodingException("Failed to access constructor for target type.", e);
}
}
/**
* Thrown when serialization or deserialization failed.
*/
public static class CodingException extends RuntimeException {
public CodingException(String message) {
super(message);
}
public CodingException(String msg, Throwable cause) {
super(msg, cause);
}
}
}