blob: 2c5b68c592c0a18fcc7dfaf0414b6b8f38b20ee0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.io;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import java.util.Arrays;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Simple serialization / deserialization methods for the {@link SimpleVersionedSerializer}.
*/
@PublicEvolving
public class SimpleVersionedSerialization {
/**
* Serializes the version and datum into a stream.
*
* <p>Data serialized via this method can be deserialized via
* {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, DataInputView)}.
*
* <p>The first four bytes will be occupied by the version, as returned by
* {@link SimpleVersionedSerializer#getVersion()}. The remaining bytes will be the serialized
* datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}, plus its length.
* The resulting array will hence be eight bytes larger than the serialized datum.
*
* @param serializer The serializer to serialize the datum with.
* @param datum The datum to serialize.
* @param out The stream to serialize to.
*/
public static <T> void writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum, DataOutputView out) throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(datum, "datum");
checkNotNull(out, "out");
final byte[] data = serializer.serialize(datum);
out.writeInt(serializer.getVersion());
out.writeInt(data.length);
out.write(data);
}
/**
* Deserializes the version and datum from a stream.
*
* <p>This method deserializes data serialized via
* {@link #writeVersionAndSerialize(SimpleVersionedSerializer, Object, DataOutputView)}.
*
* <p>The first four bytes will be interpreted as the version. The next four bytes will be
* interpreted as the length of the datum bytes, then length-many bytes will be read.
* Finally, the datum is deserialized via the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
* method.
*
* @param serializer The serializer to serialize the datum with.
* @param in The stream to deserialize from.
*/
public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, DataInputView in) throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(in, "in");
final int version = in.readInt();
final int length = in.readInt();
final byte[] data = new byte[length];
in.readFully(data);
return serializer.deserialize(version, data);
}
/**
* Serializes the version and datum into a byte array. The first four bytes will be occupied by
* the version (as returned by {@link SimpleVersionedSerializer#getVersion()}),
* written in <i>big-endian</i> encoding. The remaining bytes will be the serialized
* datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}. The resulting array
* will hence be four bytes larger than the serialized datum.
*
* <p>Data serialized via this method can be deserialized via
* {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, byte[])}.
*
* @param serializer The serializer to serialize the datum with.
* @param datum The datum to serialize.
*
* @return A byte array containing the serialized version and serialized datum.
*
* @throws IOException Exceptions from the {@link SimpleVersionedSerializer#serialize(Object)}
* method are forwarded.
*/
public static <T> byte[] writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum) throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(datum, "datum");
final byte[] data = serializer.serialize(datum);
final byte[] versionAndData = new byte[data.length + 8];
final int version = serializer.getVersion();
versionAndData[0] = (byte) (version >> 24);
versionAndData[1] = (byte) (version >> 16);
versionAndData[2] = (byte) (version >> 8);
versionAndData[3] = (byte) version;
final int length = data.length;
versionAndData[4] = (byte) (length >> 24);
versionAndData[5] = (byte) (length >> 16);
versionAndData[6] = (byte) (length >> 8);
versionAndData[7] = (byte) length;
// move the data to the array
System.arraycopy(data, 0, versionAndData, 8, data.length);
return versionAndData;
}
/**
* Deserializes the version and datum from a byte array. The first four bytes will be read as
* the version, in <i>big-endian</i> encoding. The remaining bytes will be passed to the serializer
* for deserialization, via {@link SimpleVersionedSerializer#deserialize(int, byte[])}.
*
* @param serializer The serializer to deserialize the datum with.
* @param bytes The bytes to deserialize from.
*
* @return The deserialized datum.
*
* @throws IOException Exceptions from the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
* method are forwarded.
*/
public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, byte[] bytes) throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(bytes, "bytes");
checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");
final byte[] dataOnly = Arrays.copyOfRange(bytes, 8, bytes.length);
final int version =
((bytes[0] & 0xff) << 24) |
((bytes[1] & 0xff) << 16) |
((bytes[2] & 0xff) << 8) |
(bytes[3] & 0xff);
final int length =
((bytes[4] & 0xff) << 24) |
((bytes[5] & 0xff) << 16) |
((bytes[6] & 0xff) << 8) |
(bytes[7] & 0xff);
if (length == dataOnly.length) {
return serializer.deserialize(version, dataOnly);
}
else {
throw new IOException("Corrupt data, conflicting lengths. Length fields: " + length + ", data: " + dataOnly.length);
}
}
// ------------------------------------------------------------------------
/** Utility class, not meant to be instantiated. */
private SimpleVersionedSerialization() {}
}