| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.io.hfile; |
| |
| import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; |
| |
| import java.io.DataOutput; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hbase.ByteBufferExtendedCell; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellComparator; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.KeyValueUtil; |
| import org.apache.hadoop.hbase.MetaCellComparator; |
| import org.apache.hadoop.hbase.PrivateCellUtil; |
| import org.apache.hadoop.hbase.io.compress.Compression; |
| import org.apache.hadoop.hbase.io.crypto.Encryption; |
| import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; |
| import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding; |
| import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; |
| import org.apache.hadoop.hbase.security.EncryptionUtil; |
| import org.apache.hadoop.hbase.security.User; |
| import org.apache.hadoop.hbase.util.BloomFilterWriter; |
| import org.apache.hadoop.hbase.util.ByteBufferUtils; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.FSUtils; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Common functionality needed by all versions of {@link HFile} writers. |
| */ |
| @InterfaceAudience.Private |
| public class HFileWriterImpl implements HFile.Writer { |
| private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class); |
| |
| private static final long UNSET = -1; |
| |
| /** if this feature is enabled, preCalculate encoded data size before real encoding happens */ |
| public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = |
| "hbase.writer.unified.encoded.blocksize.ratio"; |
| |
| /** Block size limit after encoding, used to unify encoded block Cache entry size */ |
| private final int encodedBlockSizeLimit; |
| |
| /** The Cell previously appended. Becomes the last cell in the file. */ |
| protected Cell lastCell = null; |
| |
| /** FileSystem stream to write into. */ |
| protected FSDataOutputStream outputStream; |
| |
| /** True if we opened the <code>outputStream</code> (and so will close it). */ |
| protected final boolean closeOutputStream; |
| |
| /** A "file info" block: a key-value map of file-wide metadata. */ |
| protected HFileInfo fileInfo = new HFileInfo(); |
| |
| /** Total # of key/value entries, i.e. how many times add() was called. */ |
| protected long entryCount = 0; |
| |
| /** Used for calculating the average key length. */ |
| protected long totalKeyLength = 0; |
| |
| /** Used for calculating the average value length. */ |
| protected long totalValueLength = 0; |
| |
| /** Len of the biggest cell. */ |
| protected long lenOfBiggestCell = 0; |
| /** Key of the biggest cell. */ |
| protected byte[] keyOfBiggestCell; |
| |
| /** Total uncompressed bytes, maybe calculate a compression ratio later. */ |
| protected long totalUncompressedBytes = 0; |
| |
| /** Meta block names. */ |
| protected List<byte[]> metaNames = new ArrayList<>(); |
| |
| /** {@link Writable}s representing meta block data. */ |
| protected List<Writable> metaData = new ArrayList<>(); |
| |
| /** |
| * First cell in a block. This reference should be short-lived since we write hfiles in a burst. |
| */ |
| protected Cell firstCellInBlock = null; |
| |
| /** May be null if we were passed a stream. */ |
| protected final Path path; |
| |
| /** Cache configuration for caching data on write. */ |
| protected final CacheConfig cacheConf; |
| |
| /** |
| * Name for this object used when logging or in toString. Is either the result of a toString on |
| * stream or else name of passed file Path. |
| */ |
| protected final String name; |
| |
| /** |
| * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is |
| * no encoding. |
| */ |
| protected final HFileDataBlockEncoder blockEncoder; |
| |
| protected final HFileIndexBlockEncoder indexBlockEncoder; |
| |
| protected final HFileContext hFileContext; |
| |
| private int maxTagsLength = 0; |
| |
| /** KeyValue version in FileInfo */ |
| public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); |
| |
| /** Version for KeyValue which includes memstore timestamp */ |
| public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; |
| |
| /** Inline block writers for multi-level block index and compound Blooms. */ |
| private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>(); |
| |
| /** block writer */ |
| protected HFileBlock.Writer blockWriter; |
| |
| private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; |
| private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; |
| |
| /** The offset of the first data block or -1 if the file is empty. */ |
| private long firstDataBlockOffset = UNSET; |
| |
| /** The offset of the last data block or 0 if the file is empty. */ |
| protected long lastDataBlockOffset = UNSET; |
| |
| /** |
| * The last(stop) Cell of the previous data block. This reference should be short-lived since we |
| * write hfiles in a burst. |
| */ |
| private Cell lastCellOfPreviousBlock = null; |
| |
| /** Additional data items to be written to the "load-on-open" section. */ |
| private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>(); |
| |
| protected long maxMemstoreTS = 0; |
| |
| public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, |
| FSDataOutputStream outputStream, HFileContext fileContext) { |
| this.outputStream = outputStream; |
| this.path = path; |
| this.name = path != null ? path.getName() : outputStream.toString(); |
| this.hFileContext = fileContext; |
| DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); |
| if (encoding != DataBlockEncoding.NONE) { |
| this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); |
| } else { |
| this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; |
| } |
| IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding(); |
| if (indexBlockEncoding != IndexBlockEncoding.NONE) { |
| this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding); |
| } else { |
| this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE; |
| } |
| closeOutputStream = path != null; |
| this.cacheConf = cacheConf; |
| float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f); |
| this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio); |
| |
| finishInit(conf); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: " |
| + cacheConf + " fileContext: " + fileContext); |
| } |
| } |
| |
| /** |
| * Add to the file info. All added key/value pairs can be obtained using |
| * {@link HFile.Reader#getHFileInfo()}. |
| * @param k Key |
| * @param v Value |
| * @throws IOException in case the key or the value are invalid |
| */ |
| @Override |
| public void appendFileInfo(final byte[] k, final byte[] v) throws IOException { |
| fileInfo.append(k, v, true); |
| } |
| |
| /** |
| * Sets the file info offset in the trailer, finishes up populating fields in the file info, and |
| * writes the file info into the given data output. The reason the data output is not always |
| * {@link #outputStream} is that we store file info as a block in version 2. |
| * @param trailer fixed file trailer |
| * @param out the data output to write the file info to |
| */ |
| protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) |
| throws IOException { |
| trailer.setFileInfoOffset(outputStream.getPos()); |
| finishFileInfo(); |
| long startTime = EnvironmentEdgeManager.currentTime(); |
| fileInfo.write(out); |
| HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); |
| } |
| |
| public long getPos() throws IOException { |
| return outputStream.getPos(); |
| |
| } |
| |
| /** |
| * Checks that the given Cell's key does not violate the key order. |
| * @param cell Cell whose key to check. |
| * @return true if the key is duplicate |
| * @throws IOException if the key or the key order is wrong |
| */ |
| protected boolean checkKey(final Cell cell) throws IOException { |
| boolean isDuplicateKey = false; |
| |
| if (cell == null) { |
| throw new IOException("Key cannot be null or empty"); |
| } |
| if (lastCell != null) { |
| int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(), |
| lastCell, cell); |
| if (keyComp > 0) { |
| String message = getLexicalErrorMessage(cell); |
| throw new IOException(message); |
| } else if (keyComp == 0) { |
| isDuplicateKey = true; |
| } |
| } |
| return isDuplicateKey; |
| } |
| |
| private String getLexicalErrorMessage(Cell cell) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Added a key not lexically larger than previous. Current cell = "); |
| sb.append(cell); |
| sb.append(", lastCell = "); |
| sb.append(lastCell); |
| // file context includes HFile path and optionally table and CF of file being written |
| sb.append("fileContext="); |
| sb.append(hFileContext); |
| return sb.toString(); |
| } |
| |
| /** Checks the given value for validity. */ |
| protected void checkValue(final byte[] value, final int offset, final int length) |
| throws IOException { |
| if (value == null) { |
| throw new IOException("Value cannot be null"); |
| } |
| } |
| |
| /** Returns Path or null if we were passed a stream rather than a Path. */ |
| @Override |
| public Path getPath() { |
| return path; |
| } |
| |
| @Override |
| public String toString() { |
| return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression=" |
| + hFileContext.getCompression().getName(); |
| } |
| |
| public static Compression.Algorithm compressionByName(String algoName) { |
| if (algoName == null) { |
| return HFile.DEFAULT_COMPRESSION_ALGORITHM; |
| } |
| return Compression.getCompressionAlgorithmByName(algoName); |
| } |
| |
| /** A helper method to create HFile output streams in constructors */ |
| protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs, |
| Path path, InetSocketAddress[] favoredNodes) throws IOException { |
| FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); |
| return FSUtils.create(conf, fs, path, perms, favoredNodes); |
| } |
| |
| /** Additional initialization steps */ |
| protected void finishInit(final Configuration conf) { |
| if (blockWriter != null) { |
| throw new IllegalStateException("finishInit called twice"); |
| } |
| blockWriter = |
| new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(), |
| conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); |
| // Data block index writer |
| boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); |
| dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, |
| cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder); |
| dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf)); |
| dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf)); |
| inlineBlockWriters.add(dataBlockIndexWriter); |
| |
| // Meta data block index writer |
| metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); |
| LOG.trace("Initialized with {}", cacheConf); |
| } |
| |
| /** |
| * At a block boundary, write all the inline blocks and opens new block. |
| */ |
| protected void checkBlockBoundary() throws IOException { |
| boolean shouldFinishBlock = false; |
| // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0 |
| // and we should use the encoding ratio |
| if (encodedBlockSizeLimit > 0) { |
| shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit; |
| } else { |
| shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() |
| || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); |
| } |
| shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate(); |
| if (shouldFinishBlock) { |
| finishBlock(); |
| writeInlineBlocks(false); |
| newBlock(); |
| } |
| } |
| |
| /** Clean up the data block that is currently being written. */ |
| private void finishBlock() throws IOException { |
| if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) { |
| return; |
| } |
| |
| // Update the first data block offset if UNSET; used scanning. |
| if (firstDataBlockOffset == UNSET) { |
| firstDataBlockOffset = outputStream.getPos(); |
| } |
| // Update the last data block offset each time through here. |
| lastDataBlockOffset = outputStream.getPos(); |
| blockWriter.writeHeaderAndData(outputStream); |
| int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); |
| Cell indexEntry = |
| getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock); |
| dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), |
| lastDataBlockOffset, onDiskSize); |
| totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); |
| if (cacheConf.shouldCacheDataOnWrite()) { |
| doCacheOnWrite(lastDataBlockOffset); |
| } |
| } |
| |
| /** |
| * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is |
| * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an |
| * optimization. It does not always work. In this case we'll just return the <code>right</code> |
| * cell. |
| * @return A cell that sorts between <code>left</code> and <code>right</code>. |
| */ |
| public static Cell getMidpoint(final CellComparator comparator, final Cell left, |
| final Cell right) { |
| if (right == null) { |
| throw new IllegalArgumentException("right cell can not be null"); |
| } |
| if (left == null) { |
| return right; |
| } |
| // If Cells from meta table, don't mess around. meta table Cells have schema |
| // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip |
| // out without trying to do this optimization. |
| if (comparator instanceof MetaCellComparator) { |
| return right; |
| } |
| byte[] midRow; |
| boolean bufferBacked = |
| left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell; |
| if (bufferBacked) { |
| midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(), |
| ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(), |
| ((ByteBufferExtendedCell) right).getRowByteBuffer(), |
| ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength()); |
| } else { |
| midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(), |
| right.getRowArray(), right.getRowOffset(), right.getRowLength()); |
| } |
| if (midRow != null) { |
| return PrivateCellUtil.createFirstOnRow(midRow); |
| } |
| // Rows are same. Compare on families. |
| if (bufferBacked) { |
| midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(), |
| ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(), |
| ((ByteBufferExtendedCell) right).getFamilyByteBuffer(), |
| ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength()); |
| } else { |
| midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(), |
| left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(), |
| right.getFamilyLength()); |
| } |
| if (midRow != null) { |
| return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length); |
| } |
| // Families are same. Compare on qualifiers. |
| if (bufferBacked) { |
| midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(), |
| ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(), |
| ((ByteBufferExtendedCell) right).getQualifierByteBuffer(), |
| ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength()); |
| } else { |
| midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(), |
| left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(), |
| right.getQualifierLength()); |
| } |
| if (midRow != null) { |
| return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length); |
| } |
| // No opportunity for optimization. Just return right key. |
| return right; |
| } |
| |
| /** |
| * Try to get a byte array that falls between left and right as short as possible with |
| * lexicographical order; |
| * @return Return a new array that is between left and right and minimally sized else just return |
| * null if left == right. |
| */ |
| private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset, |
| final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) { |
| int minLength = leftLength < rightLength ? leftLength : rightLength; |
| int diffIdx = 0; |
| for (; diffIdx < minLength; diffIdx++) { |
| byte leftByte = leftArray[leftOffset + diffIdx]; |
| byte rightByte = rightArray[rightOffset + diffIdx]; |
| if ((leftByte & 0xff) > (rightByte & 0xff)) { |
| throw new IllegalArgumentException("Left byte array sorts after right row; left=" |
| + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" |
| + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); |
| } else if (leftByte != rightByte) { |
| break; |
| } |
| } |
| if (diffIdx == minLength) { |
| if (leftLength > rightLength) { |
| // right is prefix of left |
| throw new IllegalArgumentException("Left byte array sorts after right row; left=" |
| + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" |
| + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); |
| } else if (leftLength < rightLength) { |
| // left is prefix of right. |
| byte[] minimumMidpointArray = new byte[minLength + 1]; |
| System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1); |
| minimumMidpointArray[minLength] = 0x00; |
| return minimumMidpointArray; |
| } else { |
| // left == right |
| return null; |
| } |
| } |
| // Note that left[diffIdx] can never be equal to 0xff since left < right |
| byte[] minimumMidpointArray = new byte[diffIdx + 1]; |
| System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1); |
| minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); |
| return minimumMidpointArray; |
| } |
| |
| /** |
| * Try to create a new byte array that falls between left and right as short as possible with |
| * lexicographical order. |
| * @return Return a new array that is between left and right and minimally sized else just return |
| * null if left == right. |
| */ |
| private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength, |
| ByteBuffer right, int rightOffset, int rightLength) { |
| int minLength = leftLength < rightLength ? leftLength : rightLength; |
| int diffIdx = 0; |
| for (; diffIdx < minLength; diffIdx++) { |
| int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx); |
| int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx); |
| if ((leftByte & 0xff) > (rightByte & 0xff)) { |
| throw new IllegalArgumentException("Left byte array sorts after right row; left=" |
| + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" |
| + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); |
| } else if (leftByte != rightByte) { |
| break; |
| } |
| } |
| if (diffIdx == minLength) { |
| if (leftLength > rightLength) { |
| // right is prefix of left |
| throw new IllegalArgumentException("Left byte array sorts after right row; left=" |
| + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" |
| + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); |
| } else if (leftLength < rightLength) { |
| // left is prefix of right. |
| byte[] minimumMidpointArray = new byte[minLength + 1]; |
| ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0, |
| minLength + 1); |
| minimumMidpointArray[minLength] = 0x00; |
| return minimumMidpointArray; |
| } else { |
| // left == right |
| return null; |
| } |
| } |
| // Note that left[diffIdx] can never be equal to 0xff since left < right |
| byte[] minimumMidpointArray = new byte[diffIdx + 1]; |
| ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1); |
| minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); |
| return minimumMidpointArray; |
| } |
| |
| /** Gives inline block writers an opportunity to contribute blocks. */ |
| private void writeInlineBlocks(boolean closing) throws IOException { |
| for (InlineBlockWriter ibw : inlineBlockWriters) { |
| while (ibw.shouldWriteBlock(closing)) { |
| long offset = outputStream.getPos(); |
| boolean cacheThisBlock = ibw.getCacheOnWrite(); |
| ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType())); |
| blockWriter.writeHeaderAndData(outputStream); |
| ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), |
| blockWriter.getUncompressedSizeWithoutHeader()); |
| totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); |
| |
| if (cacheThisBlock) { |
| doCacheOnWrite(offset); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Caches the last written HFile block. |
| * @param offset the offset of the block we want to cache. Used to determine the cache key. |
| */ |
| private void doCacheOnWrite(long offset) { |
| cacheConf.getBlockCache().ifPresent(cache -> { |
| HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); |
| try { |
| cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), |
| cacheFormatBlock, cacheConf.isInMemory(), true); |
| } finally { |
| // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent |
| cacheFormatBlock.release(); |
| } |
| }); |
| } |
| |
| /** |
| * Ready a new block for writing. |
| */ |
| protected void newBlock() throws IOException { |
| // This is where the next block begins. |
| blockWriter.startWriting(BlockType.DATA); |
| firstCellInBlock = null; |
| if (lastCell != null) { |
| lastCellOfPreviousBlock = lastCell; |
| } |
| } |
| |
| /** |
| * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive. |
| * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance. |
| * If metadata is small, consider adding to file info using |
| * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data |
| * later (DO NOT REUSE) |
| */ |
| @Override |
| public void appendMetaBlock(String metaBlockName, Writable content) { |
| byte[] key = Bytes.toBytes(metaBlockName); |
| int i; |
| for (i = 0; i < metaNames.size(); ++i) { |
| // stop when the current key is greater than our own |
| byte[] cur = metaNames.get(i); |
| if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) { |
| break; |
| } |
| } |
| metaNames.add(i, key); |
| metaData.add(i, content); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (outputStream == null) { |
| return; |
| } |
| // Save data block encoder metadata in the file info. |
| blockEncoder.saveMetadata(this); |
| // Save index block encoder metadata in the file info. |
| indexBlockEncoder.saveMetadata(this); |
| // Write out the end of the data blocks, then write meta data blocks. |
| // followed by fileinfo, data block index and meta block index. |
| |
| finishBlock(); |
| writeInlineBlocks(true); |
| |
| FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); |
| |
| // Write out the metadata blocks if any. |
| if (!metaNames.isEmpty()) { |
| for (int i = 0; i < metaNames.size(); ++i) { |
| // store the beginning offset |
| long offset = outputStream.getPos(); |
| // write the metadata content |
| DataOutputStream dos = blockWriter.startWriting(BlockType.META); |
| metaData.get(i).write(dos); |
| |
| blockWriter.writeHeaderAndData(outputStream); |
| totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); |
| |
| // Add the new meta block to the meta index. |
| metaBlockIndexWriter.addEntry(metaNames.get(i), offset, |
| blockWriter.getOnDiskSizeWithHeader()); |
| } |
| } |
| |
| // Load-on-open section. |
| |
| // Data block index. |
| // |
| // In version 2, this section of the file starts with the root level data |
| // block index. We call a function that writes intermediate-level blocks |
| // first, then root level, and returns the offset of the root level block |
| // index. |
| |
| long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); |
| trailer.setLoadOnOpenOffset(rootIndexOffset); |
| |
| // Meta block index. |
| metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX), |
| "meta"); |
| blockWriter.writeHeaderAndData(outputStream); |
| totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); |
| |
| if (this.hFileContext.isIncludesMvcc()) { |
| appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); |
| appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); |
| } |
| |
| // File info |
| writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); |
| blockWriter.writeHeaderAndData(outputStream); |
| totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); |
| |
| // Load-on-open data supplied by higher levels, e.g. Bloom filters. |
| for (BlockWritable w : additionalLoadOnOpenData) { |
| blockWriter.writeBlock(w, outputStream); |
| totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); |
| } |
| |
| // Now finish off the trailer. |
| trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); |
| trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize()); |
| trailer.setFirstDataBlockOffset(firstDataBlockOffset); |
| trailer.setLastDataBlockOffset(lastDataBlockOffset); |
| trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass()); |
| trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); |
| |
| finishClose(trailer); |
| |
| blockWriter.release(); |
| } |
| |
| @Override |
| public void addInlineBlockWriter(InlineBlockWriter ibw) { |
| inlineBlockWriters.add(ibw); |
| } |
| |
| @Override |
| public void addGeneralBloomFilter(final BloomFilterWriter bfw) { |
| this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); |
| } |
| |
| @Override |
| public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { |
| this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); |
| } |
| |
| private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) { |
| if (bfw.getKeyCount() <= 0) { |
| return; |
| } |
| |
| if ( |
| blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META |
| ) { |
| throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported"); |
| } |
| additionalLoadOnOpenData.add(new BlockWritable() { |
| @Override |
| public BlockType getBlockType() { |
| return blockType; |
| } |
| |
| @Override |
| public void writeToBlock(DataOutput out) throws IOException { |
| bfw.getMetaWriter().write(out); |
| Writable dataWriter = bfw.getDataWriter(); |
| if (dataWriter != null) { |
| dataWriter.write(out); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public HFileContext getFileContext() { |
| return hFileContext; |
| } |
| |
| /** |
| * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on |
| * construction. Cell to add. Cannot be empty nor null. |
| */ |
| @Override |
| public void append(final Cell cell) throws IOException { |
| // checkKey uses comparator to check we are writing in order. |
| boolean dupKey = checkKey(cell); |
| if (!dupKey) { |
| checkBlockBoundary(); |
| } |
| |
| if (!blockWriter.isWriting()) { |
| newBlock(); |
| } |
| |
| blockWriter.write(cell); |
| |
| totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell); |
| totalValueLength += cell.getValueLength(); |
| if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) { |
| lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell); |
| keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell); |
| } |
| // Are we the first key in this block? |
| if (firstCellInBlock == null) { |
| // If cell is big, block will be closed and this firstCellInBlock reference will only last |
| // a short while. |
| firstCellInBlock = cell; |
| } |
| |
| // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? |
| lastCell = cell; |
| entryCount++; |
| this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); |
| int tagsLength = cell.getTagsLength(); |
| if (tagsLength > this.maxTagsLength) { |
| this.maxTagsLength = tagsLength; |
| } |
| } |
| |
| @Override |
| public void beforeShipped() throws IOException { |
| this.blockWriter.beforeShipped(); |
| // Add clone methods for every cell |
| if (this.lastCell != null) { |
| this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); |
| } |
| if (this.firstCellInBlock != null) { |
| this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); |
| } |
| if (this.lastCellOfPreviousBlock != null) { |
| this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); |
| } |
| } |
| |
| public Cell getLastCell() { |
| return lastCell; |
| } |
| |
| protected void finishFileInfo() throws IOException { |
| if (lastCell != null) { |
| // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean |
| // byte buffer. Won't take a tuple. |
| byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); |
| fileInfo.append(HFileInfo.LASTKEY, lastKey, false); |
| } |
| |
| // Average key length. |
| int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); |
| fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); |
| fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), |
| false); |
| |
| // Average value length. |
| int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); |
| fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); |
| |
| // Biggest cell. |
| if (keyOfBiggestCell != null) { |
| fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false); |
| fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false); |
| LOG.debug("Len of the biggest cell in {} is {}, key is {}", |
| this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell, |
| CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false)); |
| } |
| |
| if (hFileContext.isIncludesTags()) { |
| // When tags are not being written in this file, MAX_TAGS_LEN is excluded |
| // from the FileInfo |
| fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); |
| boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) |
| && hFileContext.isCompressTags(); |
| fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); |
| } |
| } |
| |
| protected int getMajorVersion() { |
| return 3; |
| } |
| |
| protected int getMinorVersion() { |
| return HFileReaderImpl.MAX_MINOR_VERSION; |
| } |
| |
| protected void finishClose(FixedFileTrailer trailer) throws IOException { |
| // Write out encryption metadata before finalizing if we have a valid crypto context |
| Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); |
| if (cryptoContext != Encryption.Context.NONE) { |
| // Wrap the context's key and write it as the encryption metadata, the wrapper includes |
| // all information needed for decryption |
| trailer.setEncryptionKey(EncryptionUtil.wrapKey( |
| cryptoContext.getConf(), cryptoContext.getConf() |
| .get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), |
| cryptoContext.getKey())); |
| } |
| // Now we can finish the close |
| trailer.setMetaIndexCount(metaNames.size()); |
| trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize()); |
| trailer.setEntryCount(entryCount); |
| trailer.setCompressionCodec(hFileContext.getCompression()); |
| |
| long startTime = EnvironmentEdgeManager.currentTime(); |
| trailer.serialize(outputStream); |
| HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); |
| |
| if (closeOutputStream) { |
| outputStream.close(); |
| outputStream = null; |
| } |
| } |
| } |