| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.cache.hdfs.internal.hoplog; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.Closeable; |
| import java.io.DataInput; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.EnumMap; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.util.ShutdownHookManager; |
| |
| import com.gemstone.gemfire.cache.CacheClosedException; |
| import com.gemstone.gemfire.cache.hdfs.HDFSIOException; |
| import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl; |
| import com.gemstone.gemfire.cache.hdfs.internal.cardinality.HyperLogLog; |
| import com.gemstone.gemfire.cache.hdfs.internal.cardinality.ICardinality; |
| import com.gemstone.gemfire.internal.cache.persistence.soplog.DelegatingSerializedComparator; |
| import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics; |
| import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics; |
| import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics.ScanOperation; |
| import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.Version; |
| import com.gemstone.gemfire.internal.util.Hex; |
| import com.gemstone.gemfire.internal.util.SingletonValue; |
| import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder; |
| |
| import org.apache.hadoop.hbase.io.hfile.BlockCache; |
| import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; |
| import org.apache.hadoop.hbase.io.hfile.CacheConfig; |
| import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; |
| import org.apache.hadoop.hbase.io.hfile.HFile; |
| import org.apache.hadoop.hbase.io.hfile.HFile.Reader; |
| import org.apache.hadoop.hbase.io.hfile.HFile.Writer; |
| import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader; |
| import org.apache.hadoop.hbase.io.hfile.HFileScanner; |
| import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; |
| import org.apache.hadoop.hbase.util.BloomFilterFactory; |
| import org.apache.hadoop.hbase.util.BloomFilterWriter; |
| |
| /** |
| * Implements hfile based {@link Hoplog} |
| */ |
| public final class HFileSortedOplog extends AbstractHoplog { |
| |
| // private static final boolean CACHE_DATA_BLOCKS_ON_READ = !Boolean.getBoolean("gemfire.HFileSortedOplog.DISABLE_CACHE_ON_READ"); |
| private final CacheConfig cacheConf; |
| private ICardinality entryCountEstimate; |
| |
| // a cached reader for the file |
| private final SingletonValue<HFileReader> reader; |
| |
| public HFileSortedOplog(HDFSStoreImpl store, Path hfilePath, |
| BlockCache blockCache, SortedOplogStatistics stats, |
| HFileStoreStatistics storeStats) throws IOException { |
| super(store, hfilePath, stats); |
| cacheConf = getCacheConfInstance(blockCache, stats, storeStats); |
| reader = getReaderContainer(); |
| } |
| |
| /** |
| * THIS METHOD SHOULD BE USED FOR LONER ONLY |
| */ |
| public static HFileSortedOplog getHoplogForLoner(FileSystem inputFS, |
| Path hfilePath) throws IOException { |
| return new HFileSortedOplog(inputFS, hfilePath, null, null, null); |
| } |
| |
| private HFileSortedOplog(FileSystem inputFS, Path hfilePath, |
| BlockCache blockCache, SortedOplogStatistics stats, |
| HFileStoreStatistics storeStats) throws IOException { |
| super(inputFS, hfilePath, stats); |
| cacheConf = getCacheConfInstance(blockCache, stats, storeStats); |
| reader = getReaderContainer(); |
| } |
| |
| protected CacheConfig getCacheConfInstance(BlockCache blockCache, |
| SortedOplogStatistics stats, HFileStoreStatistics storeStats) { |
| CacheConfig tmpConfig = null; |
| // if (stats == null) { |
| tmpConfig = new CacheConfig(conf); |
| // } else { |
| // tmpConfig = new CacheConfig(conf, CACHE_DATA_BLOCKS_ON_READ, blockCache, |
| // HFileSortedOplogFactory.convertStatistics(stats, storeStats)); |
| // } |
| tmpConfig.shouldCacheBlockOnRead(BlockCategory.ALL_CATEGORIES); |
| return tmpConfig; |
| } |
| |
| private SingletonValue<HFileReader> getReaderContainer() { |
| return new SingletonValue<HFileReader>(new SingletonBuilder<HFileReader>() { |
| @Override |
| public HFileReader create() throws IOException { |
| if (logger.isDebugEnabled()) |
| logger.debug("{}Creating hoplog reader", logPrefix); |
| return new HFileReader(); |
| } |
| |
| @Override |
| public void postCreate() { |
| if (readerListener != null) { |
| readerListener.readerCreated(); |
| } |
| } |
| |
| @Override |
| public void createInProgress() { |
| } |
| }); |
| } |
| |
| @Override |
| public HoplogReader getReader() throws IOException { |
| return reader.get(); |
| } |
| |
| @Override |
| public ICardinality getEntryCountEstimate() throws IOException { |
| ICardinality result = entryCountEstimate; |
| if (result == null) { |
| HoplogReader rdr = getReader(); // keep this out of the critical section |
| synchronized(this) { |
| result = entryCountEstimate; |
| if (result == null) { |
| entryCountEstimate = result = rdr.getCardinalityEstimator(); |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public HoplogWriter createWriter(int keys) throws IOException { |
| return new HFileSortedOplogWriter(keys); |
| } |
| |
| @Override |
| public boolean isClosed() { |
| HFileReader rdr = reader.getCachedValue(); |
| return rdr == null || rdr.isClosed(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| close(true); |
| } |
| |
| @Override |
| public void close(boolean clearCache) throws IOException { |
| compareAndClose(null, clearCache); |
| } |
| |
| private void compareAndClose(HFileReader hfileReader, boolean clearCache) throws IOException { |
| HFileReader rdr ; |
| if (hfileReader == null) { |
| rdr = reader.clear(true); |
| } else { |
| boolean result = reader.clear(hfileReader, true); |
| if (! result) { |
| if (logger.isDebugEnabled()) |
| logger.debug("{}skipping close, provided hfileReader mismatched", logPrefix); |
| return; |
| } |
| rdr = hfileReader; |
| } |
| |
| if (rdr != null) { |
| try { |
| rdr.close(clearCache); |
| } finally { |
| if (readerListener != null) { |
| readerListener.readerClosed(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "HFileSortedOplog[" + getFileName() + "]"; |
| } |
| |
| private class HFileSortedOplogWriter implements HoplogWriter { |
| private final Writer writer; |
| private final BloomFilterWriter bfw; |
| private final AtomicBoolean closed = new AtomicBoolean(false); |
| |
| public HFileSortedOplogWriter(int keys) throws IOException { |
| try { |
| int hfileBlockSize = Integer.getInteger( |
| HoplogConfig.HFILE_BLOCK_SIZE_CONF, (1 << 16)); |
| |
| Algorithm compress = Algorithm.valueOf(System.getProperty(HoplogConfig.COMPRESSION, |
| HoplogConfig.COMPRESSION_DEFAULT)); |
| |
| // ByteComparator bc = new ByteComparator(); |
| writer = HFile.getWriterFactory(conf, cacheConf) |
| .withPath(fsProvider.getFS(), path) |
| .withBlockSize(hfileBlockSize) |
| // .withComparator(bc) |
| .withCompression(compress) |
| .create(); |
| // bfw = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, BloomType.ROW, keys, |
| // writer, bc); |
| bfw = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, BloomType.ROW, keys, |
| writer); |
| |
| if (logger.isDebugEnabled()) |
| logger.debug("{}Created hoplog writer with compression " + compress, logPrefix); |
| } catch (IOException e) { |
| if (logger.isDebugEnabled()) |
| logger.debug("{}IO Error while creating writer", logPrefix); |
| throw e; |
| } |
| } |
| |
| @Override |
| public void append(byte[] key, byte[] value) throws IOException { |
| writer.append(key, value); |
| bfw.add(key, 0, key.length); |
| } |
| |
| @Override |
| public void append(ByteBuffer key, ByteBuffer value) throws IOException { |
| byte[] keyBytes = byteBufferToArray(key); |
| byte[] valueBytes = byteBufferToArray(value); |
| writer.append(keyBytes, valueBytes); |
| bfw.add(keyBytes, 0, keyBytes.length); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| close(null); |
| } |
| |
| @Override |
| public void close(EnumMap<Meta, byte[]> metadata) throws IOException { |
| if (closed.get()) { |
| if (logger.isDebugEnabled()) |
| logger.debug("{}Writer already closed", logPrefix); |
| return; |
| } |
| |
| bfw.compactBloom(); |
| writer.addGeneralBloomFilter(bfw); |
| |
| // append system metadata |
| writer.appendFileInfo(Meta.GEMFIRE_MAGIC.toBytes(), Hoplog.MAGIC); |
| writer.appendFileInfo(Meta.SORTED_OPLOG_VERSION.toBytes(), HoplogVersion.V1.toBytes()); |
| writer.appendFileInfo(Meta.GEMFIRE_VERSION.toBytes(), Version.CURRENT.toBytes()); |
| |
| // append comparator info |
| // if (writer.getComparator() instanceof DelegatingSerializedComparator) { |
| // ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| // DataOutput out = new DataOutputStream(bos); |
| // |
| // writeComparatorInfo(out, ((DelegatingSerializedComparator) writer.getComparator()).getComparators()); |
| // writer.appendFileInfo(Meta.COMPARATORS.toBytes(), bos.toByteArray()); |
| // } |
| |
| // append user metadata |
| HyperLogLog cachedEntryCountEstimate = null; |
| if (metadata != null) { |
| for (Entry<Meta, byte[]> entry : metadata.entrySet()) { |
| writer.appendFileInfo(entry.getKey().toBytes(), entry.getValue()); |
| if (Meta.LOCAL_CARDINALITY_ESTIMATE_V2.equals(entry.getKey())) { |
| cachedEntryCountEstimate = HyperLogLog.Builder.build(entry.getValue()); |
| } |
| } |
| } |
| |
| writer.close(); |
| if (logger.isDebugEnabled()) |
| logger.debug("{}Completed closing writer", logPrefix); |
| closed.set(true); |
| // cache estimate value to avoid reads later |
| entryCountEstimate = cachedEntryCountEstimate; |
| } |
| |
| @Override |
| public void hsync() throws IOException { |
| throw new UnsupportedOperationException("hsync is not supported for HFiles"); |
| } |
| |
| @Override |
| public long getCurrentSize() throws IOException { |
| throw new UnsupportedOperationException("getCurrentSize is not supported for HFiles"); |
| } |
| |
| // private void writeComparatorInfo(DataOutput out, SerializedComparator[] comparators) throws IOException { |
| // out.writeInt(comparators.length); |
| // for (SerializedComparator sc : comparators) { |
| // out.writeUTF(sc.getClass().getName()); |
| // if (sc instanceof DelegatingSerializedComparator) { |
| // writeComparatorInfo(out, ((DelegatingSerializedComparator) sc).getComparators()); |
| // } |
| // } |
| // } |
| } |
| |
| private void handleReadIOError(HFileReader hfileReader, IOException e, boolean skipFailIfSafe) { |
| if (logger.isDebugEnabled()) |
| logger.debug("Read IO error", e); |
| boolean safeError = ShutdownHookManager.get().isShutdownInProgress(); |
| if (safeError) { |
| // IOException because of closed file system. This happens when member is |
| // shutting down |
| if (logger.isDebugEnabled()) |
| logger.debug("IO error caused by filesystem shutdown", e); |
| throw new CacheClosedException("IO error caused by filesystem shutdown", e); |
| } |
| |
| // expose the error wrapped inside remote exception. Remote exceptions are |
| // handled by file system client. So let the caller handle this error |
| if (e instanceof RemoteException) { |
| e = ((RemoteException) e).unwrapRemoteException(); |
| throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e); |
| } |
| |
| FileSystem currentFs = fsProvider.checkFileSystem(); |
| if (hfileReader != null && hfileReader.previousFS != currentFs) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}Detected new FS client, closing old reader", logPrefix); |
| if (currentFs != null) { |
| if (logger.isDebugEnabled()) |
| logger.debug("CurrentFs:" + currentFs.getUri() + "-" |
| + currentFs.hashCode(), logPrefix); |
| } |
| if (hfileReader.previousFS != null) { |
| if (logger.isDebugEnabled()) |
| logger.debug("OldFs:" + hfileReader.previousFS.getUri() + "-" |
| + hfileReader.previousFS.hashCode() + ", closing old reader", logPrefix); |
| } |
| } |
| try { |
| HFileSortedOplog.this.compareAndClose(hfileReader, false); |
| } catch (Exception ex) { |
| if (logger.isDebugEnabled()) |
| logger.debug("Failed to close reader", ex); |
| } |
| if (skipFailIfSafe) { |
| if (logger.isDebugEnabled()) |
| logger.debug("Not faling after io error since FS client changed"); |
| return; |
| } |
| } |
| |
| // it is not a safe error. let the caller handle it |
| throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e); |
| } |
| |
| class HFileReader implements HoplogReader, Closeable { |
| private final Reader reader; |
| private volatile BloomFilter hoplogBloom; |
| private final AtomicBoolean closed; |
| private final Map<byte[], byte[]> fileInfo; |
| private final HyperLogLog estimator; |
| private final FileSystem previousFS; |
| |
| public HFileReader() throws IOException { |
| try { |
| FileSystem fs = fsProvider.getFS(); |
| reader = HFile.createReader(fs, path, cacheConf); |
| fileInfo = reader.loadFileInfo(); |
| closed = new AtomicBoolean(false); |
| |
| validate(); |
| if (reader.getComparator() instanceof DelegatingSerializedComparator) { |
| loadComparators((DelegatingSerializedComparator) reader.getComparator()); |
| } |
| |
| // read the old HLL if it exists so that a CardinalityMergeException will trigger a Major Compaction |
| byte[] hll = fileInfo.get(Meta.LOCAL_CARDINALITY_ESTIMATE.toBytes()); |
| if (hll != null) { |
| entryCountEstimate = estimator = HyperLogLog.Builder.build(hll); |
| } else if ((hll = fileInfo.get(Meta.LOCAL_CARDINALITY_ESTIMATE_V2.toBytes())) != null) { |
| entryCountEstimate = estimator = HyperLogLog.Builder.build(hll); |
| } else { |
| estimator = new HyperLogLog(HdfsSortedOplogOrganizer.HLL_CONSTANT); |
| } |
| |
| previousFS = fs; |
| } catch (IOException e) { |
| if (logger.isDebugEnabled()) |
| logger.debug("IO Error while creating reader", e); |
| throw e; |
| } |
| } |
| |
| @Override |
| public byte[] read(byte[] key) throws IOException { |
| IOException err = null; |
| HFileReader delegateReader = this; |
| for (int retry = 1; retry >= 0; retry --) { |
| try { |
| return delegateReader.readDelegate(key); |
| } catch (IOException e) { |
| err = e; |
| handleReadIOError(delegateReader, e, retry > 0); |
| // Current reader may have got closed in error handling. Get the new |
| // one for retry attempt |
| try { |
| delegateReader = (HFileReader) HFileSortedOplog.this.getReader(); |
| } catch (IOException ex) { |
| handleReadIOError(null, e, false); |
| } |
| } |
| } |
| |
| if (logger.isDebugEnabled()) |
| logger.debug("Throwing err from read delegate ", err); |
| throw err; |
| } |
| |
| private byte[] readDelegate(byte[] key) throws IOException { |
| try { |
| if (!getBloomFilter().mightContain(key)) { |
| // bloom filter check failed, the key is not present in this hoplog |
| return null; |
| } |
| } catch (IllegalArgumentException e) { |
| if (IOException.class.isAssignableFrom(e.getCause().getClass())) { |
| throw (IOException) e.getCause(); |
| } else { |
| throw e; |
| } |
| } |
| |
| byte[] valueBytes = null; |
| ByteBuffer bb = get(key); |
| if (bb != null) { |
| valueBytes = new byte[bb.remaining()]; |
| bb.get(valueBytes); |
| } else { |
| stats.getBloom().falsePositive(); |
| } |
| return valueBytes; |
| } |
| |
| @Override |
| public ByteBuffer get(byte[] key) throws IOException { |
| assert key != null; |
| HFileScanner seek = reader.getScanner(false, true); |
| if (seek.seekTo(key) == 0) { |
| return seek.getValue(); |
| } |
| return null; |
| } |
| |
| @Override |
| public HoplogIterator<byte[], byte[]> scan(byte[] from, boolean fromInclusive, byte[] to, |
| boolean toInclusive) throws IOException { |
| IOException err = null; |
| HFileReader delegateReader = this; |
| for (int retry = 1; retry >= 0; retry --) { |
| try { |
| return delegateReader.scanDelegate(from, fromInclusive, to, toInclusive); |
| } catch (IOException e) { |
| err = e; |
| handleReadIOError(delegateReader, e, retry > 0); |
| // Current reader may have got closed in error handling. Get the new |
| // one for retry attempt |
| try { |
| delegateReader = (HFileReader) HFileSortedOplog.this.getReader(); |
| } catch (IOException ex) { |
| handleReadIOError(null, e, false); |
| } |
| } |
| } |
| if (logger.isDebugEnabled()) |
| logger.debug("Throwing err from scan delegate ", err); |
| throw err; |
| } |
| |
| private HoplogIterator<byte[], byte[]> scanDelegate(byte[] from, boolean fromInclusive, byte[] to, |
| boolean toInclusive) throws IOException { |
| return new HFileSortedIterator(reader.getScanner(true, false), from, |
| fromInclusive, to, toInclusive); |
| } |
| |
| @Override |
| public HoplogIterator<byte[], byte[]> scan(long offset, long length) |
| throws IOException { |
| /** |
| * Identifies the first and last key to be scanned based on offset and |
| * length. It loads hfile block index and identifies the first hfile block |
| * starting after offset. The key of that block is from key for scanner. |
| * Similarly it locates first block starting beyond offset + length range. |
| * It uses key of that block as the to key for scanner |
| */ |
| |
| // load block indexes in memory |
| BlockIndexReader bir = reader.getDataBlockIndexReader(); |
| int blockCount = bir.getRootBlockCount(); |
| |
| byte[] fromKey = null, toKey = null; |
| |
| // find from key |
| int i = 0; |
| for (; i < blockCount; i++) { |
| if (bir.getRootBlockOffset(i) < offset) { |
| // hfile block has offset less than this reader's split offset. check |
| // the next block |
| continue; |
| } |
| |
| // found the first hfile block starting after offset |
| fromKey = bir.getRootBlockKey(i); |
| break; |
| } |
| |
| if (fromKey == null) { |
| // seems no block starts after the offset. return no-op scanner |
| return new HFileSortedIterator(null, null, false, null, false); |
| } |
| |
| // find to key |
| for (; i < blockCount; i++) { |
| if (bir.getRootBlockOffset(i) < (offset + length)) { |
| // this hfile block lies within the offset+lenght range. check the |
| // next block for a higher offset |
| continue; |
| } |
| |
| // found the first block starting beyong offset+length range. |
| toKey = bir.getRootBlockKey(i); |
| break; |
| } |
| |
| // from key is included in scan and to key is excluded |
| HFileScanner scanner = reader.getScanner(true, false); |
| return new HFileSortedIterator(scanner, fromKey, true, toKey, false); |
| } |
| |
| @Override |
| public HoplogIterator<byte[], byte[]> scan() throws IOException { |
| return scan(null, null); |
| } |
| |
| public HoplogIterator<byte[], byte[]> scan(byte[] from, byte[] to) |
| throws IOException { |
| return scan(from, true, to, false); |
| } |
| |
| @Override |
| public BloomFilter getBloomFilter() throws IOException { |
| BloomFilter result = hoplogBloom; |
| if (result == null) { |
| synchronized (this) { |
| result = hoplogBloom; |
| if (result == null) { |
| hoplogBloom = result = new BloomFilterImpl(); |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return closed.get(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| close(true); |
| } |
| |
| public void close(boolean clearCache) throws IOException { |
| if (closed.compareAndSet(false, true)) { |
| if (logger.isDebugEnabled()) |
| logger.debug("{}Closing reader", logPrefix); |
| reader.close(clearCache); |
| } |
| } |
| |
| @Override |
| public long getEntryCount() { |
| return reader.getEntries(); |
| } |
| |
| public ICardinality getCardinalityEstimator() { |
| return estimator; |
| } |
| |
| @Override |
| public long sizeEstimate() { |
| return getCardinalityEstimator().cardinality(); |
| } |
| |
| private void validate() throws IOException { |
| // check magic |
| byte[] magic = fileInfo.get(Meta.GEMFIRE_MAGIC.toBytes()); |
| if (!Arrays.equals(magic, MAGIC)) { |
| throw new IOException(LocalizedStrings.Soplog_INVALID_MAGIC.toLocalizedString(Hex.toHex(magic))); |
| } |
| |
| // check version compatibility |
| byte[] ver = fileInfo.get(Meta.SORTED_OPLOG_VERSION.toBytes()); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}Hoplog version is " + Hex.toHex(ver), logPrefix); |
| } |
| |
| if (!Arrays.equals(ver, HoplogVersion.V1.toBytes())) { |
| throw new IOException(LocalizedStrings.Soplog_UNRECOGNIZED_VERSION.toLocalizedString(Hex.toHex(ver))); |
| } |
| } |
| |
| private void loadComparators(DelegatingSerializedComparator comparator) throws IOException { |
| byte[] raw = fileInfo.get(Meta.COMPARATORS.toBytes()); |
| assert raw != null; |
| |
| DataInput in = new DataInputStream(new ByteArrayInputStream(raw)); |
| comparator.setComparators(readComparators(in)); |
| } |
| |
| private SerializedComparator[] readComparators(DataInput in) throws IOException { |
| try { |
| SerializedComparator[] comps = new SerializedComparator[in.readInt()]; |
| assert comps.length > 0; |
| |
| for (int i = 0; i < comps.length; i++) { |
| comps[i] = (SerializedComparator) Class.forName(in.readUTF()).newInstance(); |
| if (comps[i] instanceof DelegatingSerializedComparator) { |
| ((DelegatingSerializedComparator) comps[i]).setComparators(readComparators(in)); |
| } |
| } |
| return comps; |
| |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| |
| class BloomFilterImpl implements BloomFilter { |
| private final org.apache.hadoop.hbase.util.BloomFilter hfileBloom; |
| |
| public BloomFilterImpl() throws IOException { |
| DataInput bin = reader.getGeneralBloomFilterMetadata(); |
| // instantiate bloom filter if meta present in hfile |
| if (bin != null) { |
| hfileBloom = BloomFilterFactory.createFromMeta(bin, reader); |
| if (reader.getComparator() instanceof DelegatingSerializedComparator) { |
| loadComparators((DelegatingSerializedComparator) hfileBloom.getComparator()); |
| } |
| } else { |
| hfileBloom = null; |
| } |
| } |
| |
| @Override |
| public boolean mightContain(byte[] key) { |
| assert key != null; |
| return mightContain(key, 0, key.length); |
| } |
| |
| @Override |
| public boolean mightContain(byte[] key, int keyOffset, int keyLength) { |
| assert key != null; |
| long start = stats.getBloom().begin(); |
| boolean found = hfileBloom == null ? true : hfileBloom.contains(key, keyOffset, keyLength, null); |
| stats.getBloom().end(start); |
| return found; |
| } |
| |
| @Override |
| public long getBloomSize() { |
| return hfileBloom == null ? 0 : hfileBloom.getByteSize(); |
| } |
| } |
| |
| // TODO change the KV types to ByteBuffer instead of byte[] |
| public final class HFileSortedIterator implements HoplogIterator<byte[], byte[]> { |
| private final HFileScanner scan; |
| |
| private final byte[] from; |
| private final boolean fromInclusive; |
| |
| private final byte[] to; |
| private final boolean toInclusive; |
| |
| private ByteBuffer prefetchedKey; |
| private ByteBuffer prefetchedValue; |
| private ByteBuffer currentKey; |
| private ByteBuffer currentValue; |
| |
| // variable linked to scan stats |
| ScanOperation scanStat; |
| private long scanStart; |
| |
| public HFileSortedIterator(HFileScanner scan, byte[] from, boolean fromInclusive, byte[] to, |
| boolean toInclusive) throws IOException { |
| this.scan = scan; |
| this.from = from; |
| this.fromInclusive = fromInclusive; |
| this.to = to; |
| this.toInclusive = toInclusive; |
| |
| scanStat = (stats == null) ? new SortedOplogStatistics("", "").new ScanOperation( |
| 0, 0, 0, 0, 0, 0, 0) : stats.getScan(); |
| scanStart = scanStat.begin(); |
| |
| if (scan == null) { |
| return; |
| } |
| |
| assert from == null || to == null |
| || scan.getReader().getComparator().compare(from, to) <= 0; |
| |
| initIterator(); |
| } |
| |
| /* |
| * prefetches first key and value from the file for hasnext to work |
| */ |
| private void initIterator() throws IOException { |
| long startNext = scanStat.beginIteration(); |
| boolean scanSuccessful = true; |
| if (from == null) { |
| scanSuccessful = scan.seekTo(); |
| } else { |
| int compare = scan.seekTo(from); |
| if (compare == 0 && !fromInclusive || compare > 0) { |
| // as from in exclusive and first key is same as from, skip the first key |
| scanSuccessful = scan.next(); |
| } |
| } |
| |
| populateKV(startNext, scanSuccessful); |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return prefetchedKey != null; |
| } |
| |
| @Override |
| public byte[] next() throws IOException { |
| return byteBufferToArray(nextBB()); |
| } |
| |
| public ByteBuffer nextBB() throws IOException { |
| long startNext = scanStat.beginIteration(); |
| if (prefetchedKey == null) { |
| throw new NoSuchElementException(); |
| } |
| |
| currentKey = prefetchedKey; |
| currentValue = prefetchedValue; |
| |
| prefetchedKey = null; |
| prefetchedValue = null; |
| |
| if (scan.next()) { |
| populateKV(startNext, true); |
| } |
| |
| return currentKey; |
| } |
| |
| |
| private void populateKV(long nextStartTime, boolean scanSuccessful) { |
| if (!scanSuccessful) { |
| //end of file reached. collect stats and return |
| scanStat.endIteration(0, nextStartTime); |
| return; |
| } |
| |
| prefetchedKey = scan.getKey(); |
| prefetchedValue = scan.getValue(); |
| |
| if (to != null) { |
| // TODO Optimization? Perform int comparison instead of byte[]. Identify |
| // offset of key greater than two. |
| int compare = -1; |
| compare = scan.getReader().getComparator().compare |
| (prefetchedKey.array(), prefetchedKey.arrayOffset(), prefetchedKey.remaining(), to, 0, to.length); |
| if (compare > 0 || (compare == 0 && !toInclusive)) { |
| prefetchedKey = null; |
| prefetchedValue = null; |
| return; |
| } |
| } |
| |
| // account for bytes read and time spent |
| int byteCount = prefetchedKey.remaining() + prefetchedValue.remaining(); |
| scanStat.endIteration(byteCount, nextStartTime); |
| } |
| |
| |
| @Override |
| public byte[] getKey() { |
| return byteBufferToArray(getKeyBB()); |
| } |
| public ByteBuffer getKeyBB() { |
| return currentKey; |
| } |
| |
| @Override |
| public byte[] getValue() { |
| return byteBufferToArray(getValueBB()); |
| } |
| public ByteBuffer getValueBB() { |
| return currentValue; |
| } |
| |
| @Override |
| public void remove() { |
| throw new UnsupportedOperationException("Cannot delete a key-value from a hfile sorted oplog"); |
| } |
| |
| @Override |
| public void close() { |
| scanStat.end(scanStart); |
| } |
| } |
| } |
| |
| public static byte[] byteBufferToArray(ByteBuffer bb) { |
| if (bb == null) { |
| return null; |
| } |
| |
| byte[] tmp = new byte[bb.remaining()]; |
| bb.duplicate().get(tmp); |
| return tmp; |
| } |
| } |