blob: b6f5466476419d60856c1b79a37fb0d339b04787 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.internal;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.HashFunction;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Hashing;
/**
* An Ism file is a prefix encoded composite key value file broken into shards. Each composite key
* is composed of a fixed number of component keys. A fixed number of those sub keys represent the
* shard key portion; see {@link IsmRecord} and {@link IsmRecordCoder} for further details around
* the data format. In addition to the data, there is a bloom filter, and multiple indices to allow
* for efficient retrieval.
*
* <p>An Ism file is composed of these high level sections (in order):
*
* <ul>
* <li>shard block
* <li>bloom filter (See {@code ScalableBloomFilter} for details on encoding format)
* <li>shard index
* <li>footer (See {@link Footer} for details on encoding format)
* </ul>
*
* <p>The shard block is composed of multiple copies of the following:
*
* <ul>
* <li>data block
* <li>data index
* </ul>
*
* <p>The data block is composed of multiple copies of the following:
*
* <ul>
* <li>key prefix (See {@link KeyPrefix} for details on encoding format)
* <li>unshared key bytes
* <li>value bytes
* <li>optional 0x00 0x00 bytes followed by metadata bytes (if the following 0x00 0x00 bytes are
* not present, then there are no metadata bytes)
* </ul>
*
* Each key written into the data block must be in unsigned lexicographically increasing order and
* also its shard portion of the key must hash to the same shard id as all other keys within the
* same data block. The hashing function used is the <a
* href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">32-bit murmur3 algorithm, x86
* variant</a> (little-endian variant), using {@code 1225801234} as the seed value.
*
* <p>The data index is composed of {@code N} copies of the following:
*
* <ul>
* <li>key prefix (See {@link KeyPrefix} for details on encoding format)
* <li>unshared key bytes
* <li>byte offset to key prefix in data block (variable length long coding)
* </ul>
*
* <p>The shard index is composed of a {@link VarInt variable length integer} encoding representing
* the number of shard index records followed by that many shard index records. See {@link
* IsmShardCoder} for further details as to its encoding scheme.
*/
public class IsmFormat {
private static final int HASH_SEED = 1225801234;
private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(HASH_SEED);
public static final int SHARD_BITS = 0x7F; // [0-127] shards + [128-255] metadata shards
/**
* A record containing a composite key and either a value or metadata. The composite key must not
* contain the metadata key component place holder if producing a value record, and must contain
* the metadata component key place holder if producing a metadata record.
*
* <p>The composite key is a fixed number of component keys where the first {@code N} component
* keys are used to create a shard id via hashing. See {@link IsmRecordCoder#hash(List)} for
* further details.
*/
@AutoValue
public abstract static class IsmRecord<V> {
abstract List<?> keyComponents();
@Nullable
abstract V value();
@SuppressWarnings("mutable")
@Nullable
abstract byte[] metadata();
IsmRecord() {} // Prevent public constructor
/** Returns an IsmRecord with the specified key components and value. */
public static <V> IsmRecord<V> of(List<?> keyComponents, V value) {
checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
checkArgument(
!isMetadataKey(keyComponents), "Expected key components to not contain metadata key.");
return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, value, null);
}
public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
checkNotNull(metadata);
checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
checkArgument(
isMetadataKey(keyComponents), "Expected key components to contain metadata key.");
return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, null, metadata);
}
/** Returns the list of key components. */
public List<?> getKeyComponents() {
return keyComponents();
}
/** Returns the key component at the specified index. */
public Object getKeyComponent(int index) {
return keyComponents().get(index);
}
/** Returns the value. Throws {@link IllegalStateException} if this is not a value record. */
public V getValue() {
checkState(
!isMetadataKey(keyComponents()), "This is a metadata record and not a value record.");
return value();
}
/**
* Returns the metadata. Throws {@link IllegalStateException} if this is not a metadata record.
*/
public byte[] getMetadata() {
checkState(
isMetadataKey(keyComponents()), "This is a value record and not a metadata record.");
return metadata();
}
}
/**
* A {@link Coder} for {@link IsmRecord}s.
*
* <p>Note that this coder standalone will not produce an Ism file. This coder can be used to
* materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder is combined with
* an {@code IsmSink} will one produce an Ism file.
*
* <p>The {@link IsmRecord} encoded format is:
*
* <ul>
* <li>encoded key component 1 using key component coder 1
* <li>...
* <li>encoded key component N using key component coder N
* <li>encoded value using value coder
* </ul>
*/
public static class IsmRecordCoder<V> extends CustomCoder<IsmRecord<V>> {
/** Returns an IsmRecordCoder with the specified key component coders, value coder. */
public static <V> IsmRecordCoder<V> of(
int numberOfShardKeyCoders,
int numberOfMetadataShardKeyCoders,
List<Coder<?>> keyComponentCoders,
Coder<V> valueCoder) {
checkNotNull(keyComponentCoders);
checkArgument(keyComponentCoders.size() > 0);
checkArgument(numberOfShardKeyCoders > 0);
checkArgument(numberOfShardKeyCoders <= keyComponentCoders.size());
checkArgument(numberOfMetadataShardKeyCoders <= keyComponentCoders.size());
return new IsmRecordCoder<>(
numberOfShardKeyCoders, numberOfMetadataShardKeyCoders, keyComponentCoders, valueCoder);
}
private final int numberOfShardKeyCoders;
private final int numberOfMetadataShardKeyCoders;
private final List<Coder<?>> keyComponentCoders;
private final Coder<V> valueCoder;
private IsmRecordCoder(
int numberOfShardKeyCoders,
int numberOfMetadataShardKeyCoders,
List<Coder<?>> keyComponentCoders,
Coder<V> valueCoder) {
this.numberOfShardKeyCoders = numberOfShardKeyCoders;
this.numberOfMetadataShardKeyCoders = numberOfMetadataShardKeyCoders;
this.keyComponentCoders = keyComponentCoders;
this.valueCoder = valueCoder;
}
/** Returns the list of key component coders. */
public List<Coder<?>> getKeyComponentCoders() {
return keyComponentCoders;
}
/** Returns the key coder at the specified index. */
@SuppressWarnings("unchecked")
public <T> Coder<T> getKeyComponentCoder(int index) {
return (Coder<T>) keyComponentCoders.get(index);
}
/** Returns the value coder. */
public Coder<V> getValueCoder() {
return valueCoder;
}
@Override
public void encode(IsmRecord<V> value, OutputStream outStream)
throws CoderException, IOException {
if (value.getKeyComponents().size() != keyComponentCoders.size()) {
throw new CoderException(
String.format(
"Expected %s key component(s) but received key component(s) %s.",
keyComponentCoders.size(), value.getKeyComponents()));
}
for (int i = 0; i < keyComponentCoders.size(); ++i) {
getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream);
}
if (isMetadataKey(value.getKeyComponents())) {
ByteArrayCoder.of().encode(value.getMetadata(), outStream);
} else {
valueCoder.encode(value.getValue(), outStream);
}
}
@Override
public IsmRecord<V> decode(InputStream inStream) throws CoderException, IOException {
List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
for (Coder<?> keyCoder : keyComponentCoders) {
keyComponents.add(keyCoder.decode(inStream));
}
if (isMetadataKey(keyComponents)) {
return IsmRecord.meta(keyComponents, ByteArrayCoder.of().decode(inStream));
} else {
return IsmRecord.of(keyComponents, valueCoder.decode(inStream));
}
}
public int getNumberOfShardKeyCoders(List<?> keyComponents) {
if (isMetadataKey(keyComponents)) {
return numberOfMetadataShardKeyCoders;
} else {
return numberOfShardKeyCoders;
}
}
/**
* Computes the shard id for the given key component(s).
*
* <p>The shard keys are encoded into their byte representations and hashed using the <a
* href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">32-bit murmur3 algorithm, x86
* variant</a> (little-endian variant), using {@code 1225801234} as the seed value. We ensure
* that shard ids for metadata keys and normal keys do not overlap.
*/
public int hash(List<?> keyComponents) {
return encodeAndHash(keyComponents, new RandomAccessData(), new ArrayList<>());
}
/**
* Computes the shard id for the given key component(s).
*
* <p>Mutates {@code keyBytes} such that when returned, contains the encoded version of the key
* components.
*/
public int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) {
return encodeAndHash(keyComponents, keyBytesToMutate, new ArrayList<>());
}
/**
* Computes the shard id for the given key component(s).
*
* <p>Mutates {@code keyBytes} such that when returned, contains the encoded version of the key
* components. Also, mutates {@code keyComponentByteOffsetsToMutate} to store the location where
* each key component's encoded byte representation ends within {@code keyBytes}.
*/
public int encodeAndHash(
List<?> keyComponents,
RandomAccessData keyBytesToMutate,
List<Integer> keyComponentByteOffsetsToMutate) {
checkNotNull(keyComponents);
checkArgument(
keyComponents.size() <= keyComponentCoders.size(),
"Expected at most %s key component(s) but received %s.",
keyComponentCoders.size(),
keyComponents);
final int numberOfKeyCodersToUse;
final int shardOffset;
if (isMetadataKey(keyComponents)) {
numberOfKeyCodersToUse = numberOfMetadataShardKeyCoders;
shardOffset = SHARD_BITS + 1;
} else {
numberOfKeyCodersToUse = numberOfShardKeyCoders;
shardOffset = 0;
}
checkArgument(
numberOfKeyCodersToUse <= keyComponents.size(),
"Expected at least %s key component(s) but received %s.",
numberOfShardKeyCoders,
keyComponents);
try {
// Encode the shard portion
for (int i = 0; i < numberOfKeyCodersToUse; ++i) {
getKeyComponentCoder(i)
.encode(keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
}
int rval =
HASH_FUNCTION.hashBytes(keyBytesToMutate.array(), 0, keyBytesToMutate.size()).asInt()
& SHARD_BITS;
rval += shardOffset;
// Encode the remainder
for (int i = numberOfKeyCodersToUse; i < keyComponents.size(); ++i) {
getKeyComponentCoder(i)
.encode(keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
}
return rval;
} catch (IOException e) {
throw new IllegalStateException(
String.format("Failed to hash %s with coder %s", keyComponents, this), e);
}
}
@Override
public List<Coder<?>> getCoderArguments() {
return ImmutableList.<Coder<?>>builder().addAll(keyComponentCoders).add(valueCoder).build();
}
@Override
public void verifyDeterministic() throws Coder.NonDeterministicException {
verifyDeterministic(
this, "Key component coders expected to be deterministic.", keyComponentCoders);
verifyDeterministic(this, "Value coder expected to be deterministic.", valueCoder);
}
@Override
public boolean consistentWithEquals() {
for (Coder<?> keyComponentCoder : keyComponentCoders) {
if (!keyComponentCoder.consistentWithEquals()) {
return false;
}
}
return valueCoder.consistentWithEquals();
}
@Override
public Object structuralValue(IsmRecord<V> record) {
checkNotNull(record);
checkState(
record.getKeyComponents().size() == keyComponentCoders.size(),
"Expected the number of key component coders %s "
+ "to match the number of key components %s.",
keyComponentCoders.size(),
record.getKeyComponents());
if (consistentWithEquals()) {
ArrayList<Object> keyComponentStructuralValues = new ArrayList<>();
for (int i = 0; i < keyComponentCoders.size(); ++i) {
keyComponentStructuralValues.add(
getKeyComponentCoder(i).structuralValue(record.getKeyComponent(i)));
}
if (isMetadataKey(record.getKeyComponents())) {
return IsmRecord.meta(keyComponentStructuralValues, record.getMetadata());
} else {
return IsmRecord.of(
keyComponentStructuralValues, valueCoder.structuralValue(record.getValue()));
}
}
return super.structuralValue(record);
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (!(other instanceof IsmRecordCoder)) {
return false;
}
IsmRecordCoder<?> that = (IsmRecordCoder<?>) other;
return Objects.equals(this.numberOfShardKeyCoders, that.numberOfShardKeyCoders)
&& Objects.equals(
this.numberOfMetadataShardKeyCoders, that.numberOfMetadataShardKeyCoders)
&& Objects.equals(this.keyComponentCoders, that.keyComponentCoders)
&& Objects.equals(this.valueCoder, that.valueCoder);
}
@Override
public int hashCode() {
return Objects.hash(
numberOfShardKeyCoders, numberOfMetadataShardKeyCoders, keyComponentCoders, valueCoder);
}
}
/** Validates that the key portion of the given coder is deterministic. */
public static void validateCoderIsCompatible(IsmRecordCoder<?> coder) {
for (Coder<?> keyComponentCoder : coder.getKeyComponentCoders()) {
try {
keyComponentCoder.verifyDeterministic();
} catch (NonDeterministicException e) {
throw new IllegalArgumentException(
String.format(
"Key component coder %s is expected to be deterministic.", keyComponentCoder),
e);
}
}
}
/** Returns true if and only if any of the passed in key components represent a metadata key. */
public static boolean isMetadataKey(List<?> keyComponents) {
for (Object keyComponent : keyComponents) {
if (keyComponent == METADATA_KEY) {
return true;
}
}
return false;
}
/** A marker object representing the wildcard metadata key component. */
private static final Object METADATA_KEY =
new Object() {
@Override
public String toString() {
return "META";
}
@Override
public boolean equals(Object obj) {
return this == obj;
}
@Override
public int hashCode() {
return -1248902349;
}
};
/**
* An object representing a wild card for a key component. Encoded using {@link MetadataKeyCoder}.
*/
public static Object getMetadataKey() {
return METADATA_KEY;
}
/**
* A coder for metadata key component. Can be used to wrap key component coder allowing for the
* metadata key component to be used as a place holder instead of an actual key.
*/
public static class MetadataKeyCoder<K> extends StructuredCoder<K> {
public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
checkNotNull(keyCoder);
return new MetadataKeyCoder<>(keyCoder);
}
private final Coder<K> keyCoder;
private MetadataKeyCoder(Coder<K> keyCoder) {
this.keyCoder = keyCoder;
}
public Coder<K> getKeyCoder() {
return keyCoder;
}
@Override
public void encode(K value, OutputStream outStream) throws CoderException, IOException {
if (value == METADATA_KEY) {
outStream.write(0);
} else {
outStream.write(1);
keyCoder.encode(value, outStream);
}
}
@Override
public K decode(InputStream inStream) throws CoderException, IOException {
int marker = inStream.read();
if (marker == 0) {
return (K) getMetadataKey();
} else if (marker == 1) {
return keyCoder.decode(inStream);
} else {
throw new CoderException(String.format("Expected marker but got %s.", marker));
}
}
@Override
public List<Coder<?>> getCoderArguments() {
return ImmutableList.of(keyCoder);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(this, "Expected key coder to be deterministic", keyCoder);
}
}
/**
* A shard descriptor containing shard id, the data block offset, and the index offset for the
* given shard.
*/
@AutoValue
public abstract static class IsmShard {
abstract int id();
abstract long blockOffset();
abstract long indexOffset();
IsmShard() {}
/** Returns an IsmShard with the given id, block offset and no index offset. */
public static IsmShard of(int id, long blockOffset) {
IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, -1);
checkState(id >= 0, "%s attempting to be written with negative shard id.", ismShard);
checkState(
blockOffset >= 0, "%s attempting to be written with negative block offset.", ismShard);
return ismShard;
}
/** Returns an IsmShard with the given id, block offset, and index offset. */
public static IsmShard of(int id, long blockOffset, long indexOffset) {
IsmShard ismShard = new AutoValue_IsmFormat_IsmShard(id, blockOffset, indexOffset);
checkState(id >= 0, "%s attempting to be written with negative shard id.", ismShard);
checkState(
blockOffset >= 0, "%s attempting to be written with negative block offset.", ismShard);
checkState(
indexOffset >= 0, "%s attempting to be written with negative index offset.", ismShard);
return ismShard;
}
/** Return the shard id. */
public int getId() {
return id();
}
/** Return the absolute position within the Ism file where the data block begins. */
public long getBlockOffset() {
return blockOffset();
}
/**
* Return the absolute position within the Ism file where the index block begins. Throws {@link
* IllegalStateException} if the index offset was never specified.
*/
public long getIndexOffset() {
checkState(
indexOffset() >= 0, "Unable to fetch index offset because it was never specified.");
return indexOffset();
}
/** Returns a new IsmShard like this one with the specified index offset. */
public IsmShard withIndexOffset(long indexOffset) {
return of(id(), blockOffset(), indexOffset);
}
}
/**
* A {@link ListCoder} wrapping a {@link IsmShardCoder} used to encode the shard index. See {@link
* ListCoder} for its encoding specification and {@link IsmShardCoder} for its encoding
* specification.
*/
public static final Coder<List<IsmShard>> ISM_SHARD_INDEX_CODER =
ListCoder.of(IsmShardCoder.of());
/**
* A coder for {@link IsmShard}s.
*
* <p>The shard descriptor is encoded as:
*
* <ul>
* <li>id (variable length integer encoding)
* <li>blockOffset (variable length long encoding)
* <li>indexOffset (variable length long encoding)
* </ul>
*/
public static class IsmShardCoder extends AtomicCoder<IsmShard> {
private static final IsmShardCoder INSTANCE = new IsmShardCoder();
/** Returns an IsmShardCoder. */
public static IsmShardCoder of() {
return INSTANCE;
}
private IsmShardCoder() {}
@Override
public void encode(IsmShard value, OutputStream outStream) throws CoderException, IOException {
checkState(
value.getIndexOffset() >= 0, "%s attempting to be written without index offset.", value);
VarIntCoder.of().encode(value.getId(), outStream);
VarLongCoder.of().encode(value.getBlockOffset(), outStream);
VarLongCoder.of().encode(value.getIndexOffset(), outStream);
}
@Override
public IsmShard decode(InputStream inStream) throws CoderException, IOException {
return IsmShard.of(
VarIntCoder.of().decode(inStream),
VarLongCoder.of().decode(inStream),
VarLongCoder.of().decode(inStream));
}
@Override
public void verifyDeterministic() {
VarIntCoder.of().verifyDeterministic();
VarLongCoder.of().verifyDeterministic();
}
@Override
public boolean consistentWithEquals() {
return true;
}
}
/**
* The prefix used before each key which contains the number of shared and unshared bytes from the
* previous key that was read. The key prefix along with the previous key and the unshared key
* bytes allows one to construct the current key by doing the following {@code currentKey =
* previousKey[0 : sharedBytes] + read(unsharedBytes)}.
*
* <p>The key prefix is encoded as:
*
* <ul>
* <li>number of shared key bytes (variable length integer coding)
* <li>number of unshared key bytes (variable length integer coding)
* </ul>
*/
@AutoValue
public abstract static class KeyPrefix {
public abstract int getSharedKeySize();
public abstract int getUnsharedKeySize();
public static KeyPrefix of(int sharedKeySize, int unsharedKeySize) {
return new AutoValue_IsmFormat_KeyPrefix(sharedKeySize, unsharedKeySize);
}
}
/** A {@link Coder} for {@link KeyPrefix}. */
public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
public static KeyPrefixCoder of() {
return INSTANCE;
}
@Override
public void encode(KeyPrefix value, OutputStream outStream) throws CoderException, IOException {
VarInt.encode(value.getSharedKeySize(), outStream);
VarInt.encode(value.getUnsharedKeySize(), outStream);
}
@Override
public KeyPrefix decode(InputStream inStream) throws CoderException, IOException {
return KeyPrefix.of(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
}
@Override
public void verifyDeterministic() {}
@Override
public boolean consistentWithEquals() {
return true;
}
@Override
public boolean isRegisterByteSizeObserverCheap(KeyPrefix value) {
return true;
}
@Override
public long getEncodedElementByteSize(KeyPrefix value) throws Exception {
checkNotNull(value);
return VarInt.getLength(value.getSharedKeySize())
+ (long) VarInt.getLength(value.getUnsharedKeySize());
}
}
/**
* The footer stores the relevant information required to locate the index and bloom filter. It
* also stores a version byte and the number of keys stored.
*
* <p>The footer is encoded as the value containing:
*
* <ul>
* <li>start of bloom filter offset (big endian long coding)
* <li>start of shard index position offset (big endian long coding)
* <li>number of keys in file (big endian long coding)
* <li>0x01 (version key as a single byte)
* </ul>
*/
@AutoValue
public abstract static class Footer {
public static final int LONG_BYTES = 8;
public static final int FIXED_LENGTH = 3 * LONG_BYTES + 1;
public static final byte VERSION = 2;
public abstract byte getVersion();
public abstract long getIndexPosition();
public abstract long getBloomFilterPosition();
public abstract long getNumberOfKeys();
public static Footer of(long indexPosition, long bloomFilterPosition, long numberOfKeys) {
return new AutoValue_IsmFormat_Footer(
VERSION, indexPosition, bloomFilterPosition, numberOfKeys);
}
}
/** A {@link Coder} for {@link Footer}. */
public static final class FooterCoder extends AtomicCoder<Footer> {
private static final FooterCoder INSTANCE = new FooterCoder();
public static FooterCoder of() {
return INSTANCE;
}
@Override
public void encode(Footer value, OutputStream outStream) throws CoderException, IOException {
DataOutputStream dataOut = new DataOutputStream(outStream);
dataOut.writeLong(value.getIndexPosition());
dataOut.writeLong(value.getBloomFilterPosition());
dataOut.writeLong(value.getNumberOfKeys());
dataOut.write(Footer.VERSION);
}
@Override
public Footer decode(InputStream inStream) throws CoderException, IOException {
DataInputStream dataIn = new DataInputStream(inStream);
Footer footer = Footer.of(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());
int version = dataIn.read();
if (version != Footer.VERSION) {
throw new IOException(
"Unknown version " + version + ". " + "Only version 2 is currently supported.");
}
return footer;
}
@Override
public void verifyDeterministic() {}
@Override
public boolean consistentWithEquals() {
return true;
}
@Override
public boolean isRegisterByteSizeObserverCheap(Footer value) {
return true;
}
@Override
public long getEncodedElementByteSize(Footer value) throws Exception {
return Footer.FIXED_LENGTH;
}
}
}