blob: eddf3159336cd14b9b76bfc361d8342d460a1cc4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.file.rfile.bcfile;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;
import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
import org.apache.accumulo.core.crypto.CryptoUtils;
import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.spi.crypto.FileDecrypter;
import org.apache.accumulo.core.spi.crypto.FileEncrypter;
import org.apache.accumulo.core.spi.crypto.NoFileDecrypter;
import org.apache.accumulo.core.spi.crypto.NoFileEncrypter;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
/**
* Block Compressed file, the underlying physical storage layer for TFile. BCFile provides the basic
* block level compression for the data block and meta blocks. It is separated from TFile as it may
* be used for other block-compressed file implementation.
*/
public final class BCFile {
// the current version of BCFile impl, increment them (major or minor) made
// enough changes
/**
* Simplified encryption interface. Allows more flexible encryption.
*
* @since 2.0
*/
static final Version API_VERSION_3 = new Version((short) 3, (short) 0);
/**
* Experimental crypto parameters, not flexible. Do not use.
*/
static final Version API_VERSION_2 = new Version((short) 2, (short) 0);
/**
* Original BCFile version, prior to encryption. Also, any files before 2.0 that didn't have
* encryption were marked with this version.
*/
static final Version API_VERSION_1 = new Version((short) 1, (short) 0);
static final Log LOG = LogFactory.getLog(BCFile.class);
private static final String FS_OUTPUT_BUF_SIZE_ATTR = "tfile.fs.output.buffer.size";
private static final String FS_INPUT_BUF_SIZE_ATTR = "tfile.fs.input.buffer.size";
private static int getFSOutputBufferSize(Configuration conf) {
return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
}
private static int getFSInputBufferSize(Configuration conf) {
return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 32 * 1024);
}
/**
* Prevent the instantiation of BCFile objects.
*/
private BCFile() {
// nothing
}
/**
* BCFile writer, the entry point for creating a new BCFile.
*/
public static class Writer implements Closeable {
private final RateLimitedOutputStream out;
private final Configuration conf;
private FileEncrypter encrypter;
private CryptoEnvironmentImpl cryptoEnvironment;
// the single meta block containing index of compressed data blocks
final DataIndex dataIndex;
// index for meta blocks
final MetaIndex metaIndex;
boolean blkInProgress = false;
private boolean metaBlkSeen = false;
private boolean closed = false;
long errorCount = 0;
// reusable buffers.
private BytesWritable fsOutputBuffer;
private long length = 0;
public long getLength() {
return this.length;
}
/**
* Intermediate class that maintain the state of a Writable Compression Block.
*/
private static final class WBlockState {
private final Algorithm compressAlgo;
private Compressor compressor; // !null only if using native
// Hadoop compression
private final RateLimitedOutputStream fsOut;
private final OutputStream cipherOut;
private final long posStart;
private final SimpleBufferedOutputStream fsBufferedOutput;
private OutputStream out;
public WBlockState(Algorithm compressionAlgo, RateLimitedOutputStream fsOut,
BytesWritable fsOutputBuffer, Configuration conf, FileEncrypter encrypter)
throws IOException {
this.compressAlgo = compressionAlgo;
this.fsOut = fsOut;
this.posStart = fsOut.position();
fsOutputBuffer.setCapacity(getFSOutputBufferSize(conf));
this.fsBufferedOutput =
new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.getBytes());
this.compressor = compressAlgo.getCompressor();
try {
this.cipherOut = encrypter.encryptStream(fsBufferedOutput);
this.out = compressionAlgo.createCompressionStream(cipherOut, compressor, 0);
} catch (IOException e) {
compressAlgo.returnCompressor(compressor);
throw e;
}
}
/**
* Get the output stream for BlockAppender's consumption.
*
* @return the output stream suitable for writing block data.
*/
OutputStream getOutputStream() {
return out;
}
/**
* Get the current position in file.
*
* @return The current byte offset in underlying file.
*/
long getCurrentPos() {
return fsOut.position() + fsBufferedOutput.size();
}
long getStartPos() {
return posStart;
}
/**
* Current size of compressed data.
*/
long getCompressedSize() {
return getCurrentPos() - posStart;
}
/**
* Finishing up the current block.
*/
public void finish() throws IOException {
try {
if (out != null) {
out.flush();
// If the cipherOut stream is different from the fsBufferedOutput stream, then we likely
// have
// an actual encrypted output stream that needs to be closed in order for it
// to flush the final bytes to the output stream. We should have set the flag to
// make sure that this close does *not* close the underlying stream, so calling
// close here should do the write thing.
if (fsBufferedOutput != cipherOut) {
// Close the cipherOutputStream
cipherOut.close();
}
out = null;
}
} finally {
compressAlgo.returnCompressor(compressor);
compressor = null;
}
}
}
/**
* Access point to stuff data into a block.
*
*/
public class BlockAppender extends DataOutputStream {
private final MetaBlockRegister metaBlockRegister;
private final WBlockState wBlkState;
private boolean closed = false;
/**
* Constructor
*
* @param metaBlockRegister
* the block register, which is called when the block is closed.
* @param wbs
* The writable compression block state.
*/
BlockAppender(MetaBlockRegister metaBlockRegister, WBlockState wbs) {
super(wbs.getOutputStream());
this.metaBlockRegister = metaBlockRegister;
this.wBlkState = wbs;
}
BlockAppender(WBlockState wbs) {
super(wbs.getOutputStream());
this.metaBlockRegister = null;
this.wBlkState = wbs;
}
/**
* Get the raw size of the block.
*
* Caution: size() comes from DataOutputStream which returns Integer.MAX_VALUE on an overflow.
* This results in a value of 2GiB meaning that an unknown amount of data, at least 2GiB
* large, has been written. RFiles handle this issue by keeping track of the position of
* blocks instead of relying on blocks to provide this information.
*
* @return the number of uncompressed bytes written through the BlockAppender so far.
*/
public long getRawSize() {
return size() & 0x00000000ffffffffL;
}
/**
* Get the compressed size of the block in progress.
*
* @return the number of compressed bytes written to the underlying FS file. The size may be
* smaller than actual need to compress the all data written due to internal buffering
* inside the compressor.
*/
public long getCompressedSize() throws IOException {
return wBlkState.getCompressedSize();
}
public long getStartPos() {
return wBlkState.getStartPos();
}
@Override
public void flush() {
// The down stream is a special kind of stream that finishes a
// compression block upon flush. So we disable flush() here.
}
/**
* Signaling the end of write to the block. The block register will be called for registering
* the finished block.
*/
@Override
public void close() throws IOException {
if (closed) {
return;
}
try {
++errorCount;
wBlkState.finish();
if (metaBlockRegister != null)
metaBlockRegister.register(getRawSize(), wBlkState.getStartPos(),
wBlkState.getCurrentPos());
--errorCount;
} finally {
closed = true;
blkInProgress = false;
}
}
}
/**
* Constructor
*
* @param fout
* FS output stream.
* @param compressionName
* Name of the compression algorithm, which will be used for all data blocks.
* @see Compression#getSupportedAlgorithms
*/
public Writer(FSDataOutputStream fout, RateLimiter writeLimiter, String compressionName,
Configuration conf, CryptoService cryptoService) throws IOException {
if (fout.getPos() != 0) {
throw new IOException("Output file not at zero offset.");
}
this.out = new RateLimitedOutputStream(fout, writeLimiter);
this.conf = conf;
dataIndex = new DataIndex(compressionName);
metaIndex = new MetaIndex();
fsOutputBuffer = new BytesWritable();
Magic.write(this.out);
this.cryptoEnvironment = new CryptoEnvironmentImpl(Scope.RFILE, null);
this.encrypter = cryptoService.getFileEncrypter(this.cryptoEnvironment);
}
/**
* Close the BCFile Writer. Attempting to use the Writer after calling <code>close</code> is not
* allowed and may lead to undetermined results.
*/
@Override
public void close() throws IOException {
if (closed) {
return;
}
try {
if (errorCount == 0) {
if (blkInProgress) {
throw new IllegalStateException("Close() called with active block appender.");
}
// add metaBCFileIndex to metaIndex as the last meta block
try (BlockAppender appender =
prepareMetaBlock(DataIndex.BLOCK_NAME, getDefaultCompressionAlgorithm())) {
dataIndex.write(appender);
}
long offsetIndexMeta = out.position();
metaIndex.write(out);
long offsetCryptoParameter = out.position();
byte[] cryptoParams = this.encrypter.getDecryptionParameters();
out.writeInt(cryptoParams.length);
out.write(cryptoParams);
out.writeLong(offsetIndexMeta);
out.writeLong(offsetCryptoParameter);
API_VERSION_3.write(out);
Magic.write(out);
out.flush();
length = out.position();
out.close();
}
} finally {
closed = true;
}
}
private Algorithm getDefaultCompressionAlgorithm() {
return dataIndex.getDefaultCompressionAlgorithm();
}
private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo)
throws IOException, MetaBlockAlreadyExists {
if (blkInProgress) {
throw new IllegalStateException("Cannot create Meta Block until previous block is closed.");
}
if (metaIndex.getMetaByName(name) != null) {
throw new MetaBlockAlreadyExists("name=" + name);
}
MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
WBlockState wbs = new WBlockState(compressAlgo, out, fsOutputBuffer, conf, encrypter);
BlockAppender ba = new BlockAppender(mbr, wbs);
blkInProgress = true;
metaBlkSeen = true;
return ba;
}
/**
* Create a Meta Block and obtain an output stream for adding data into the block. The Meta
* Block will be compressed with the same compression algorithm as data blocks. There can only
* be one BlockAppender stream active at any time. Regular Blocks may not be created after the
* first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation.
*
* @param name
* The name of the Meta Block. The name must not conflict with existing Meta Blocks.
* @return The BlockAppender stream
* @throws MetaBlockAlreadyExists
* If the meta block with the name already exists.
*/
public BlockAppender prepareMetaBlock(String name) throws IOException, MetaBlockAlreadyExists {
return prepareMetaBlock(name, getDefaultCompressionAlgorithm());
}
/**
* Create a Data Block and obtain an output stream for adding data into the block. There can
* only be one BlockAppender stream active at any time. Data Blocks may not be created after the
* first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation.
*
* @return The BlockAppender stream
*/
public BlockAppender prepareDataBlock() throws IOException {
if (blkInProgress) {
throw new IllegalStateException("Cannot create Data Block until previous block is closed.");
}
if (metaBlkSeen) {
throw new IllegalStateException("Cannot create Data Block after Meta Blocks.");
}
WBlockState wbs =
new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer, conf, encrypter);
BlockAppender ba = new BlockAppender(wbs);
blkInProgress = true;
return ba;
}
/**
* Callback to make sure a meta block is added to the internal list when its stream is closed.
*/
private class MetaBlockRegister {
private final String name;
private final Algorithm compressAlgo;
MetaBlockRegister(String name, Algorithm compressAlgo) {
this.name = name;
this.compressAlgo = compressAlgo;
}
public void register(long raw, long begin, long end) {
metaIndex.addEntry(
new MetaIndexEntry(name, compressAlgo, new BlockRegion(begin, end - begin, raw)));
}
}
}
/**
* BCFile Reader, interface to read the file's data and meta blocks.
*/
public static class Reader implements Closeable {
private final SeekableDataInputStream in;
private final Configuration conf;
final DataIndex dataIndex;
// Index for meta blocks
final MetaIndex metaIndex;
final Version version;
private byte[] decryptionParams;
private FileDecrypter decrypter;
/**
* Intermediate class that maintain the state of a Readable Compression Block.
*/
private static final class RBlockState {
private final Algorithm compressAlgo;
private Decompressor decompressor;
private final BlockRegion region;
private final InputStream in;
private volatile boolean closed;
public <InputStreamType extends InputStream & Seekable> RBlockState(Algorithm compressionAlgo,
InputStreamType fsin, BlockRegion region, Configuration conf, FileDecrypter decrypter)
throws IOException {
this.compressAlgo = compressionAlgo;
this.region = region;
this.decompressor = compressionAlgo.getDecompressor();
BoundedRangeFileInputStream boundedRangeFileInputStream = new BoundedRangeFileInputStream(
fsin, this.region.getOffset(), this.region.getCompressedSize());
try {
InputStream inputStreamToBeCompressed =
decrypter.decryptStream(boundedRangeFileInputStream);
this.in = compressAlgo.createDecompressionStream(inputStreamToBeCompressed, decompressor,
getFSInputBufferSize(conf));
} catch (IOException e) {
compressAlgo.returnDecompressor(decompressor);
throw e;
}
closed = false;
}
/**
* Get the output stream for BlockAppender's consumption.
*
* @return the output stream suitable for writing block data.
*/
public InputStream getInputStream() {
return in;
}
public BlockRegion getBlockRegion() {
return region;
}
public void finish() throws IOException {
synchronized (in) {
if (!closed) {
try {
in.close();
} finally {
closed = true;
if (decompressor != null) {
try {
compressAlgo.returnDecompressor(decompressor);
} finally {
decompressor = null;
}
}
}
}
}
}
}
/**
* Access point to read a block.
*/
public static class BlockReader extends DataInputStream {
private final RBlockState rBlkState;
private boolean closed = false;
BlockReader(RBlockState rbs) {
super(rbs.getInputStream());
rBlkState = rbs;
}
/**
* Finishing reading the block. Release all resources.
*/
@Override
public void close() throws IOException {
if (closed) {
return;
}
try {
// Do not set rBlkState to null. People may access stats after calling
// close().
rBlkState.finish();
} finally {
closed = true;
}
}
/**
* Get the uncompressed size of the block.
*
* @return uncompressed size of the block.
*/
public long getRawSize() {
return rBlkState.getBlockRegion().getRawSize();
}
}
public byte[] serializeMetadata(int maxSize) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
metaIndex.write(out);
if (out.size() > maxSize) {
return null;
}
dataIndex.write(out);
if (out.size() > maxSize) {
return null;
}
CryptoUtils.writeParams(this.decryptionParams, out);
if (out.size() > maxSize) {
return null;
}
out.close();
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fin,
long fileLength, Configuration conf, CryptoService cryptoService) throws IOException {
this.in = new SeekableDataInputStream(fin);
this.conf = conf;
// Move the cursor to grab the version and the magic first
this.in.seek(fileLength - Magic.size() - Version.size());
version = new Version(this.in);
Magic.readAndVerify(this.in);
// Do a version check - API_VERSION_2 used experimental crypto parameters, no longer supported
if (!version.compatibleWith(BCFile.API_VERSION_3)
&& !version.compatibleWith(BCFile.API_VERSION_1)) {
throw new IOException("Unsupported BCFile Version found: " + version.toString() + ". "
+ "Only support " + API_VERSION_1 + " or " + API_VERSION_3);
}
// Read the right number offsets based on version
long offsetIndexMeta = 0;
long offsetCryptoParameters = 0;
if (version.equals(API_VERSION_1)) {
this.in.seek(fileLength - Magic.size() - Version.size() - Long.BYTES);
offsetIndexMeta = this.in.readLong();
} else {
this.in.seek(fileLength - Magic.size() - Version.size() - 16); // 2 * Long.BYTES = 16
offsetIndexMeta = this.in.readLong();
offsetCryptoParameters = this.in.readLong();
}
// read meta index
this.in.seek(offsetIndexMeta);
metaIndex = new MetaIndex(this.in);
CryptoEnvironment cryptoEnvironment = null;
// backwards compatibility
if (version.equals(API_VERSION_1)) {
LOG.trace("Found a version 1 file to read.");
decryptionParams = new NoFileEncrypter().getDecryptionParameters();
this.decrypter = new NoFileDecrypter();
} else {
// read crypto parameters and get decrypter
this.in.seek(offsetCryptoParameters);
decryptionParams = CryptoUtils.readParams(this.in);
cryptoEnvironment = new CryptoEnvironmentImpl(Scope.RFILE, decryptionParams);
this.decrypter = cryptoService.getFileDecrypter(cryptoEnvironment);
}
// read data:BCFile.index, the data block index
try (BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME)) {
dataIndex = new DataIndex(blockR);
}
}
public <InputStreamType extends InputStream & Seekable> Reader(byte[] serializedMetadata,
InputStreamType fin, Configuration conf, CryptoService cryptoService) throws IOException {
this.in = new SeekableDataInputStream(fin);
this.conf = conf;
ByteArrayInputStream bais = new ByteArrayInputStream(serializedMetadata);
DataInputStream dis = new DataInputStream(bais);
version = null;
metaIndex = new MetaIndex(dis);
dataIndex = new DataIndex(dis);
decryptionParams = CryptoUtils.readParams(dis);
CryptoEnvironmentImpl env = new CryptoEnvironmentImpl(Scope.RFILE, decryptionParams);
this.decrypter = cryptoService.getFileDecrypter(env);
}
/**
* Finishing reading the BCFile. Release all resources.
*/
@Override
public void close() {
// nothing to be done now
}
/**
* Get the number of data blocks.
*
* @return the number of data blocks.
*/
public int getBlockCount() {
return dataIndex.getBlockRegionList().size();
}
/**
* Stream access to a Meta Block.
*
* @param name
* meta block name
* @return BlockReader input stream for reading the meta block.
* @throws MetaBlockDoesNotExist
* The Meta Block with the given name does not exist.
*/
public BlockReader getMetaBlock(String name) throws IOException, MetaBlockDoesNotExist {
MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
if (imeBCIndex == null) {
throw new MetaBlockDoesNotExist("name=" + name);
}
BlockRegion region = imeBCIndex.getRegion();
return createReader(imeBCIndex.getCompressionAlgorithm(), region);
}
public long getMetaBlockRawSize(String name) throws IOException, MetaBlockDoesNotExist {
MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
if (imeBCIndex == null) {
throw new MetaBlockDoesNotExist("name=" + name);
}
return imeBCIndex.getRegion().getRawSize();
}
/**
* Stream access to a Data Block.
*
* @param blockIndex
* 0-based data block index.
* @return BlockReader input stream for reading the data block.
*/
public BlockReader getDataBlock(int blockIndex) throws IOException {
if (blockIndex < 0 || blockIndex >= getBlockCount()) {
throw new IndexOutOfBoundsException(
String.format("blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
}
BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex);
return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
}
public BlockReader getDataBlock(long offset, long compressedSize, long rawSize)
throws IOException {
BlockRegion region = new BlockRegion(offset, compressedSize, rawSize);
return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
}
public long getDataBlockRawSize(int blockIndex) {
if (blockIndex < 0 || blockIndex >= getBlockCount()) {
throw new IndexOutOfBoundsException(
String.format("blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
}
return dataIndex.getBlockRegionList().get(blockIndex).getRawSize();
}
private BlockReader createReader(Algorithm compressAlgo, BlockRegion region)
throws IOException {
RBlockState rbs = new RBlockState(compressAlgo, in, region, conf, decrypter);
return new BlockReader(rbs);
}
}
/**
* Index for all Meta blocks.
*/
static class MetaIndex {
// use a tree map, for getting a meta block entry by name
final Map<String,MetaIndexEntry> index;
// for write
public MetaIndex() {
index = new TreeMap<>();
}
// for read, construct the map from the file
public MetaIndex(DataInput in) throws IOException {
int count = Utils.readVInt(in);
index = new TreeMap<>();
for (int nx = 0; nx < count; nx++) {
MetaIndexEntry indexEntry = new MetaIndexEntry(in);
index.put(indexEntry.getMetaName(), indexEntry);
}
}
public void addEntry(MetaIndexEntry indexEntry) {
index.put(indexEntry.getMetaName(), indexEntry);
}
public MetaIndexEntry getMetaByName(String name) {
return index.get(name);
}
public void write(DataOutput out) throws IOException {
Utils.writeVInt(out, index.size());
for (MetaIndexEntry indexEntry : index.values()) {
indexEntry.write(out);
}
}
}
/**
* An entry describes a meta block in the MetaIndex.
*/
static final class MetaIndexEntry {
private final String metaName;
private final Algorithm compressionAlgorithm;
private static final String defaultPrefix = "data:";
private final BlockRegion region;
public MetaIndexEntry(DataInput in) throws IOException {
String fullMetaName = Utils.readString(in);
if (fullMetaName.startsWith(defaultPrefix)) {
metaName = fullMetaName.substring(defaultPrefix.length(), fullMetaName.length());
} else {
throw new IOException("Corrupted Meta region Index");
}
compressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
region = new BlockRegion(in);
}
public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm, BlockRegion region) {
this.metaName = metaName;
this.compressionAlgorithm = compressionAlgorithm;
this.region = region;
}
public String getMetaName() {
return metaName;
}
public Algorithm getCompressionAlgorithm() {
return compressionAlgorithm;
}
public BlockRegion getRegion() {
return region;
}
public void write(DataOutput out) throws IOException {
Utils.writeString(out, defaultPrefix + metaName);
Utils.writeString(out, compressionAlgorithm.getName());
region.write(out);
}
}
/**
* Index of all compressed data blocks.
*/
static class DataIndex {
static final String BLOCK_NAME = "BCFile.index";
private final Algorithm defaultCompressionAlgorithm;
// for data blocks, each entry specifies a block's offset, compressed size
// and raw size
private final ArrayList<BlockRegion> listRegions;
// for read, deserialized from a file
public DataIndex(DataInput in) throws IOException {
defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
int n = Utils.readVInt(in);
listRegions = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
BlockRegion region = new BlockRegion(in);
listRegions.add(region);
}
}
// for write
public DataIndex(String defaultCompressionAlgorithmName) {
this.defaultCompressionAlgorithm =
Compression.getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
listRegions = new ArrayList<>();
}
public Algorithm getDefaultCompressionAlgorithm() {
return defaultCompressionAlgorithm;
}
public ArrayList<BlockRegion> getBlockRegionList() {
return listRegions;
}
public void write(DataOutput out) throws IOException {
Utils.writeString(out, defaultCompressionAlgorithm.getName());
Utils.writeVInt(out, listRegions.size());
for (BlockRegion region : listRegions) {
region.write(out);
}
}
}
/**
* Magic number uniquely identifying a BCFile in the header/footer.
*/
static final class Magic {
private static final byte[] AB_MAGIC_BCFILE = {
// ... total of 16 bytes
(byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91, (byte) 0xb5, (byte) 0xd7,
(byte) 0xb6, (byte) 0x39, (byte) 0xdf, (byte) 0x41, (byte) 0x40, (byte) 0x92, (byte) 0xba,
(byte) 0xe1, (byte) 0x50};
public static void readAndVerify(DataInput in) throws IOException {
byte[] abMagic = new byte[size()];
in.readFully(abMagic);
// check against AB_MAGIC_BCFILE, if not matching, throw an
// Exception
if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) {
throw new IOException("Not a valid BCFile.");
}
}
public static void write(DataOutput out) throws IOException {
out.write(AB_MAGIC_BCFILE);
}
public static int size() {
return AB_MAGIC_BCFILE.length;
}
}
/**
* Block region.
*/
static final class BlockRegion {
private final long offset;
private final long compressedSize;
private final long rawSize;
public BlockRegion(DataInput in) throws IOException {
offset = Utils.readVLong(in);
compressedSize = Utils.readVLong(in);
rawSize = Utils.readVLong(in);
}
public BlockRegion(long offset, long compressedSize, long rawSize) {
this.offset = offset;
this.compressedSize = compressedSize;
this.rawSize = rawSize;
}
public void write(DataOutput out) throws IOException {
Utils.writeVLong(out, offset);
Utils.writeVLong(out, compressedSize);
Utils.writeVLong(out, rawSize);
}
public long getOffset() {
return offset;
}
public long getCompressedSize() {
return compressedSize;
}
public long getRawSize() {
return rawSize;
}
}
}