| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import com.google.common.base.Preconditions; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellComparator; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.KeyValueUtil; |
| import org.apache.hadoop.hbase.classification.InterfaceAudience; |
| import org.apache.hadoop.hbase.io.hfile.CacheConfig; |
| import org.apache.hadoop.hbase.io.hfile.HFile; |
| import org.apache.hadoop.hbase.io.hfile.HFileContext; |
| import org.apache.hadoop.hbase.regionserver.compactions.Compactor; |
| import org.apache.hadoop.hbase.util.BloomFilterFactory; |
| import org.apache.hadoop.hbase.util.BloomFilterWriter; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.io.WritableUtils; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.Arrays; |
| |
| /** |
| * A StoreFile writer. Use this to read/write HBase Store Files. It is package |
| * local because it is an implementation detail of the HBase regionserver. |
| */ |
| @InterfaceAudience.Private |
| public class StoreFileWriter implements Compactor.CellSink { |
| private static final Log LOG = LogFactory.getLog(StoreFileWriter.class.getName()); |
| |
| private final BloomFilterWriter generalBloomFilterWriter; |
| private final BloomFilterWriter deleteFamilyBloomFilterWriter; |
| private final BloomType bloomType; |
| private byte[] lastBloomKey; |
| private int lastBloomKeyOffset, lastBloomKeyLen; |
| private Cell lastCell = null; |
| private long earliestPutTs = HConstants.LATEST_TIMESTAMP; |
| private Cell lastDeleteFamilyCell = null; |
| private long deleteFamilyCnt = 0; |
| |
| /** |
| * timeRangeTrackerSet is used to figure if we were passed a filled-out TimeRangeTracker or not. |
| * When flushing a memstore, we set the TimeRangeTracker that it accumulated during updates to |
| * memstore in here into this Writer and use this variable to indicate that we do not need to |
| * recalculate the timeRangeTracker bounds; it was done already as part of add-to-memstore. |
| * A completed TimeRangeTracker is not set in cases of compactions when it is recalculated. |
| */ |
| private final boolean timeRangeTrackerSet; |
| final TimeRangeTracker timeRangeTracker; |
| |
| protected HFile.Writer writer; |
| private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; |
| |
| /** |
| * Creates an HFile.Writer that also write helpful meta data. |
| * @param fs file system to write to |
| * @param path file name to create |
| * @param conf user configuration |
| * @param comparator key comparator |
| * @param bloomType bloom filter setting |
| * @param maxKeys the expected maximum number of keys to be added. Was used |
| * for Bloom filter size in {@link HFile} format version 1. |
| * @param fileContext - The HFile context |
| * @throws IOException problem writing to FS |
| */ |
| StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf, |
| final CellComparator comparator, BloomType bloomType, long maxKeys, |
| InetSocketAddress[] favoredNodes, HFileContext fileContext) |
| throws IOException { |
| this(fs, path, conf, cacheConf, comparator, bloomType, maxKeys, favoredNodes, fileContext, |
| null); |
| } |
| |
| /** |
| * Creates an HFile.Writer that also write helpful meta data. |
| * @param fs file system to write to |
| * @param path file name to create |
| * @param conf user configuration |
| * @param comparator key comparator |
| * @param bloomType bloom filter setting |
| * @param maxKeys the expected maximum number of keys to be added. Was used |
| * for Bloom filter size in {@link HFile} format version 1. |
| * @param favoredNodes |
| * @param fileContext - The HFile context |
| * @param trt Ready-made timetracker to use. |
| * @throws IOException problem writing to FS |
| */ |
| private StoreFileWriter(FileSystem fs, Path path, |
| final Configuration conf, |
| CacheConfig cacheConf, |
| final CellComparator comparator, BloomType bloomType, long maxKeys, |
| InetSocketAddress[] favoredNodes, HFileContext fileContext, |
| final TimeRangeTracker trt) |
| throws IOException { |
| // If passed a TimeRangeTracker, use it. Set timeRangeTrackerSet so we don't destroy it. |
| // TODO: put the state of the TRT on the TRT; i.e. make a read-only version (TimeRange) when |
| // it no longer writable. |
| this.timeRangeTrackerSet = trt != null; |
| this.timeRangeTracker = this.timeRangeTrackerSet? trt: new TimeRangeTracker(); |
| writer = HFile.getWriterFactory(conf, cacheConf) |
| .withPath(fs, path) |
| .withComparator(comparator) |
| .withFavoredNodes(favoredNodes) |
| .withFileContext(fileContext) |
| .create(); |
| |
| generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite( |
| conf, cacheConf, bloomType, |
| (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); |
| |
| if (generalBloomFilterWriter != null) { |
| this.bloomType = bloomType; |
| if(this.bloomType == BloomType.ROWCOL) { |
| lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue(); |
| } |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", " + |
| generalBloomFilterWriter.getClass().getSimpleName()); |
| } |
| } else { |
| // Not using Bloom filters. |
| this.bloomType = BloomType.NONE; |
| } |
| |
| // initialize delete family Bloom filter when there is NO RowCol Bloom |
| // filter |
| if (this.bloomType != BloomType.ROWCOL) { |
| this.deleteFamilyBloomFilterWriter = BloomFilterFactory |
| .createDeleteBloomAtWrite(conf, cacheConf, |
| (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); |
| } else { |
| deleteFamilyBloomFilterWriter = null; |
| } |
| if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { |
| LOG.trace("Delete Family Bloom filter type for " + path + ": " + |
| deleteFamilyBloomFilterWriter.getClass().getSimpleName()); |
| } |
| } |
| |
| /** |
| * Writes meta data. |
| * Call before {@link #close()} since its written as meta data to this file. |
| * @param maxSequenceId Maximum sequence id. |
| * @param majorCompaction True if this file is product of a major compaction |
| * @throws IOException problem writing to FS |
| */ |
| public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) |
| throws IOException { |
| writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); |
| writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, |
| Bytes.toBytes(majorCompaction)); |
| appendTrackedTimestampsToMetadata(); |
| } |
| |
| /** |
| * Writes meta data. |
| * Call before {@link #close()} since its written as meta data to this file. |
| * @param maxSequenceId Maximum sequence id. |
| * @param majorCompaction True if this file is product of a major compaction |
| * @param mobCellsCount The number of mob cells. |
| * @throws IOException problem writing to FS |
| */ |
| public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, |
| final long mobCellsCount) throws IOException { |
| writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); |
| writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); |
| writer.appendFileInfo(StoreFile.MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); |
| appendTrackedTimestampsToMetadata(); |
| } |
| |
| /** |
| * Add TimestampRange and earliest put timestamp to Metadata |
| */ |
| public void appendTrackedTimestampsToMetadata() throws IOException { |
| appendFileInfo(StoreFile.TIMERANGE_KEY, WritableUtils.toByteArray(timeRangeTracker)); |
| appendFileInfo(StoreFile.EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); |
| } |
| |
| /** |
| * Record the earlest Put timestamp. |
| * |
| * If the timeRangeTracker is not set, |
| * update TimeRangeTracker to include the timestamp of this key |
| */ |
| public void trackTimestamps(final Cell cell) { |
| if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { |
| earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); |
| } |
| if (!timeRangeTrackerSet) { |
| timeRangeTracker.includeTimestamp(cell); |
| } |
| } |
| |
| private void appendGeneralBloomfilter(final Cell cell) throws IOException { |
| if (this.generalBloomFilterWriter != null) { |
| // only add to the bloom filter on a new, unique key |
| boolean newKey = true; |
| if (this.lastCell != null) { |
| switch(bloomType) { |
| case ROW: |
| newKey = ! CellUtil.matchingRows(cell, lastCell); |
| break; |
| case ROWCOL: |
| newKey = ! CellUtil.matchingRowColumn(cell, lastCell); |
| break; |
| case NONE: |
| newKey = false; |
| break; |
| default: |
| throw new IOException("Invalid Bloom filter type: " + bloomType + |
| " (ROW or ROWCOL expected)"); |
| } |
| } |
| if (newKey) { |
| /* |
| * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png |
| * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp |
| * |
| * 2 Types of Filtering: |
| * 1. Row = Row |
| * 2. RowCol = Row + Qualifier |
| */ |
| byte[] bloomKey = null; |
| // Used with ROW_COL bloom |
| KeyValue bloomKeyKV = null; |
| int bloomKeyOffset, bloomKeyLen; |
| |
| switch (bloomType) { |
| case ROW: |
| bloomKey = cell.getRowArray(); |
| bloomKeyOffset = cell.getRowOffset(); |
| bloomKeyLen = cell.getRowLength(); |
| break; |
| case ROWCOL: |
| // merge(row, qualifier) |
| // TODO: could save one buffer copy in case of compound Bloom |
| // filters when this involves creating a KeyValue |
| // TODO : Handle while writes also |
| bloomKeyKV = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), |
| cell.getRowLength(), |
| HConstants.EMPTY_BYTE_ARRAY, 0, 0, cell.getQualifierArray(), |
| cell.getQualifierOffset(), |
| cell.getQualifierLength()); |
| bloomKey = bloomKeyKV.getBuffer(); |
| bloomKeyOffset = bloomKeyKV.getKeyOffset(); |
| bloomKeyLen = bloomKeyKV.getKeyLength(); |
| break; |
| default: |
| throw new IOException("Invalid Bloom filter type: " + bloomType + |
| " (ROW or ROWCOL expected)"); |
| } |
| generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen); |
| if (lastBloomKey != null) { |
| int res = 0; |
| // hbase:meta does not have blooms. So we need not have special interpretation |
| // of the hbase:meta cells. We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom |
| if (bloomType == BloomType.ROW) { |
| res = Bytes.BYTES_RAWCOMPARATOR.compare(bloomKey, bloomKeyOffset, bloomKeyLen, |
| lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen); |
| } else { |
| // TODO : Caching of kv components becomes important in these cases |
| res = CellComparator.COMPARATOR.compare(bloomKeyKV, lastBloomKeyOnlyKV); |
| } |
| if (res <= 0) { |
| throw new IOException("Non-increasing Bloom keys: " |
| + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen) + " after " |
| + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen)); |
| } |
| } |
| lastBloomKey = bloomKey; |
| lastBloomKeyOffset = bloomKeyOffset; |
| lastBloomKeyLen = bloomKeyLen; |
| if (bloomType == BloomType.ROWCOL) { |
| lastBloomKeyOnlyKV.setKey(bloomKey, bloomKeyOffset, bloomKeyLen); |
| } |
| this.lastCell = cell; |
| } |
| } |
| } |
| |
| private void appendDeleteFamilyBloomFilter(final Cell cell) |
| throws IOException { |
| if (!CellUtil.isDeleteFamily(cell) && !CellUtil.isDeleteFamilyVersion(cell)) { |
| return; |
| } |
| |
| // increase the number of delete family in the store file |
| deleteFamilyCnt++; |
| if (null != this.deleteFamilyBloomFilterWriter) { |
| boolean newKey = true; |
| if (lastDeleteFamilyCell != null) { |
| // hbase:meta does not have blooms. So we need not have special interpretation |
| // of the hbase:meta cells |
| newKey = !CellUtil.matchingRows(cell, lastDeleteFamilyCell); |
| } |
| if (newKey) { |
| this.deleteFamilyBloomFilterWriter.add(cell.getRowArray(), |
| cell.getRowOffset(), cell.getRowLength()); |
| this.lastDeleteFamilyCell = cell; |
| } |
| } |
| } |
| |
| public void append(final Cell cell) throws IOException { |
| appendGeneralBloomfilter(cell); |
| appendDeleteFamilyBloomFilter(cell); |
| writer.append(cell); |
| trackTimestamps(cell); |
| } |
| |
| public Path getPath() { |
| return this.writer.getPath(); |
| } |
| |
| public boolean hasGeneralBloom() { |
| return this.generalBloomFilterWriter != null; |
| } |
| |
| /** |
| * For unit testing only. |
| * |
| * @return the Bloom filter used by this writer. |
| */ |
| BloomFilterWriter getGeneralBloomWriter() { |
| return generalBloomFilterWriter; |
| } |
| |
| private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { |
| boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); |
| if (haveBloom) { |
| bfw.compactBloom(); |
| } |
| return haveBloom; |
| } |
| |
| private boolean closeGeneralBloomFilter() throws IOException { |
| boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); |
| |
| // add the general Bloom filter writer and append file info |
| if (hasGeneralBloom) { |
| writer.addGeneralBloomFilter(generalBloomFilterWriter); |
| writer.appendFileInfo(StoreFile.BLOOM_FILTER_TYPE_KEY, |
| Bytes.toBytes(bloomType.toString())); |
| if (lastBloomKey != null) { |
| writer.appendFileInfo(StoreFile.LAST_BLOOM_KEY, Arrays.copyOfRange( |
| lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset |
| + lastBloomKeyLen)); |
| } |
| } |
| return hasGeneralBloom; |
| } |
| |
| private boolean closeDeleteFamilyBloomFilter() throws IOException { |
| boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); |
| |
| // add the delete family Bloom filter writer |
| if (hasDeleteFamilyBloom) { |
| writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); |
| } |
| |
| // append file info about the number of delete family kvs |
| // even if there is no delete family Bloom. |
| writer.appendFileInfo(StoreFile.DELETE_FAMILY_COUNT, |
| Bytes.toBytes(this.deleteFamilyCnt)); |
| |
| return hasDeleteFamilyBloom; |
| } |
| |
| public void close() throws IOException { |
| boolean hasGeneralBloom = this.closeGeneralBloomFilter(); |
| boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); |
| |
| writer.close(); |
| |
| // Log final Bloom filter statistics. This needs to be done after close() |
| // because compound Bloom filters might be finalized as part of closing. |
| if (LOG.isTraceEnabled()) { |
| LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " + |
| (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " + |
| getPath()); |
| } |
| |
| } |
| |
| public void appendFileInfo(byte[] key, byte[] value) throws IOException { |
| writer.appendFileInfo(key, value); |
| } |
| |
| /** For use in testing. |
| */ |
| HFile.Writer getHFileWriter() { |
| return writer; |
| } |
| |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", |
| justification="Will not overflow") |
| public static class Builder { |
| private final Configuration conf; |
| private final CacheConfig cacheConf; |
| private final FileSystem fs; |
| |
| private CellComparator comparator = CellComparator.COMPARATOR; |
| private BloomType bloomType = BloomType.NONE; |
| private long maxKeyCount = 0; |
| private Path dir; |
| private Path filePath; |
| private InetSocketAddress[] favoredNodes; |
| private HFileContext fileContext; |
| private TimeRangeTracker trt; |
| |
| public Builder(Configuration conf, CacheConfig cacheConf, |
| FileSystem fs) { |
| this.conf = conf; |
| this.cacheConf = cacheConf; |
| this.fs = fs; |
| } |
| |
| /** |
| * @param trt A premade TimeRangeTracker to use rather than build one per append (building one |
| * of these is expensive so good to pass one in if you have one). |
| * @return this (for chained invocation) |
| */ |
| public Builder withTimeRangeTracker(final TimeRangeTracker trt) { |
| Preconditions.checkNotNull(trt); |
| this.trt = trt; |
| return this; |
| } |
| |
| /** |
| * Use either this method or {@link #withFilePath}, but not both. |
| * @param dir Path to column family directory. The directory is created if |
| * does not exist. The file is given a unique name within this |
| * directory. |
| * @return this (for chained invocation) |
| */ |
| public Builder withOutputDir(Path dir) { |
| Preconditions.checkNotNull(dir); |
| this.dir = dir; |
| return this; |
| } |
| |
| /** |
| * Use either this method or {@link #withOutputDir}, but not both. |
| * @param filePath the StoreFile path to write |
| * @return this (for chained invocation) |
| */ |
| public Builder withFilePath(Path filePath) { |
| Preconditions.checkNotNull(filePath); |
| this.filePath = filePath; |
| return this; |
| } |
| |
| /** |
| * @param favoredNodes an array of favored nodes or possibly null |
| * @return this (for chained invocation) |
| */ |
| public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) { |
| this.favoredNodes = favoredNodes; |
| return this; |
| } |
| |
| public Builder withComparator(CellComparator comparator) { |
| Preconditions.checkNotNull(comparator); |
| this.comparator = comparator; |
| return this; |
| } |
| |
| public Builder withBloomType(BloomType bloomType) { |
| Preconditions.checkNotNull(bloomType); |
| this.bloomType = bloomType; |
| return this; |
| } |
| |
| /** |
| * @param maxKeyCount estimated maximum number of keys we expect to add |
| * @return this (for chained invocation) |
| */ |
| public Builder withMaxKeyCount(long maxKeyCount) { |
| this.maxKeyCount = maxKeyCount; |
| return this; |
| } |
| |
| public Builder withFileContext(HFileContext fileContext) { |
| this.fileContext = fileContext; |
| return this; |
| } |
| |
| public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind/*NOT USED!!*/) { |
| // TODO: HAS NO EFFECT!!! FIX!! |
| return this; |
| } |
| /** |
| * Create a store file writer. Client is responsible for closing file when |
| * done. If metadata, add BEFORE closing using |
| * {@link StoreFileWriter#appendMetadata}. |
| */ |
| public StoreFileWriter build() throws IOException { |
| if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) { |
| throw new IllegalArgumentException("Either specify parent directory " + |
| "or file path"); |
| } |
| |
| if (dir == null) { |
| dir = filePath.getParent(); |
| } |
| |
| if (!fs.exists(dir)) { |
| fs.mkdirs(dir); |
| } |
| |
| if (filePath == null) { |
| filePath = StoreFile.getUniqueFile(fs, dir); |
| if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { |
| bloomType = BloomType.NONE; |
| } |
| } |
| |
| if (comparator == null) { |
| comparator = CellComparator.COMPARATOR; |
| } |
| return new StoreFileWriter(fs, filePath, |
| conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext, trt); |
| } |
| } |
| } |