blob: bc0df4306b626d60e0b6b6d5d6b465fd87492256 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static java.util.Objects.requireNonNull;
import java.lang.ref.WeakReference;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an
* LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a
* non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock}
* operations.
* </p>
* Contains three levels of block priority to allow for scan-resistance and in-memory families
* {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder#setInMemory(boolean)} (An
* in-memory column family is a column family that should be served from memory if possible):
* single-access, multiple-accesses, and in-memory priority. A block is added with an in-memory
* priority flag if {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptor#isInMemory()},
* otherwise a block becomes a single access priority the first time it is read into this block
* cache. If a block is accessed again while in cache, it is marked as a multiple access priority
* block. This delineation of blocks is used to prevent scans from thrashing the cache adding a
* least-frequently-used element to the eviction algorithm.
* <p/>
* Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each
* priority will retain close to its maximum size, however, if any priority is not using its entire
* chunk the others are able to grow beyond their chunk size.
* <p/>
* Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The
* block size is not especially important as this cache is fully dynamic in its sizing of blocks. It
* is only used for pre-allocating data structures and in initial heap estimation of the map.
* <p/>
* The detailed constructor defines the sizes for the three priorities (they should total to the
* <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction
* thread.
* <p/>
* The <code>acceptable size</code> is the cache size level which triggers the eviction process to
* start. It evicts enough blocks to get the size below the minimum size specified.
* <p/>
* Eviction happens in a separate thread and involves a single full-scan of the map. It determines
* how many bytes must be freed to reach the minimum size, and then while scanning determines the
* fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times
* bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative
* sizes and usage.
*/
@InterfaceAudience.Private
public class LruBlockCache implements FirstLevelBlockCache {
private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
/**
* Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
* evicting during an eviction run till the cache size is down to 80% of the total.
*/
private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
/**
* Acceptable size of cache (no evictions if size < acceptable)
*/
private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
"hbase.lru.blockcache.acceptable.factor";
/**
* Hard capacity limit of cache, will reject any put if size > this * acceptable
*/
static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
"hbase.lru.blockcache.hard.capacity.limit.factor";
private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
"hbase.lru.blockcache.single.percentage";
private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
"hbase.lru.blockcache.multi.percentage";
private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
"hbase.lru.blockcache.memory.percentage";
/**
* Configuration key to force data-block always (except in-memory are too much)
* cached in memory for in-memory hfile, unlike inMemory, which is a column-family
* configuration, inMemoryForceMode is a cluster-wide configuration
*/
private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
"hbase.lru.rs.inmemoryforcemode";
/* Default Configuration Parameters*/
/* Backing Concurrent Map Configuration */
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/* Eviction thresholds */
private static final float DEFAULT_MIN_FACTOR = 0.95f;
static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
/* Priority buckets */
private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
private static final float DEFAULT_MULTI_FACTOR = 0.50f;
private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
/* Statistics thread */
private static final int STAT_THREAD_PERIOD = 60 * 5;
private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
/**
* Defined the cache map as {@link ConcurrentHashMap} here, because in
* {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent
* (key, func). Besides, the func method must execute exactly once only when the key is present
* and under the lock context, otherwise the reference count will be messed up. Notice that the
* {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
*/
private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
/** Eviction lock (locked when eviction in process) */
private transient final ReentrantLock evictionLock = new ReentrantLock(true);
private final long maxBlockSize;
/** Volatile boolean to track if we are in an eviction process or not */
private volatile boolean evictionInProgress = false;
/** Eviction thread */
private transient final EvictionThread evictionThread;
/** Statistics thread schedule pool (for heavy debugging, could remove) */
private transient final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
/** Current size of cache */
private final AtomicLong size;
/** Current size of data blocks */
private final LongAdder dataBlockSize;
/** Current number of cached elements */
private final AtomicLong elements;
/** Current number of cached data block elements */
private final LongAdder dataBlockElements;
/** Cache access count (sequential ID) */
private final AtomicLong count;
/** hard capacity limit */
private float hardCapacityLimitFactor;
/** Cache statistics */
private final CacheStats stats;
/** Maximum allowable size of cache (block put if size > max, evict) */
private long maxSize;
/** Approximate block size */
private long blockSize;
/** Acceptable size of cache (no evictions if size < acceptable) */
private float acceptableFactor;
/** Minimum threshold of cache (when evicting, evict until size < min) */
private float minFactor;
/** Single access bucket size */
private float singleFactor;
/** Multiple access bucket size */
private float multiFactor;
/** In-memory bucket size */
private float memoryFactor;
/** Overhead of the structure itself */
private long overhead;
/** Whether in-memory hfile's data block has higher priority when evicting */
private boolean forceInMemory;
/**
* Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
* external cache as L2.
* Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
*/
private transient BlockCache victimHandler = null;
/**
* Default constructor. Specify maximum size and expected average block
* size (approximation is fine).
*
* <p>All other factors will be calculated based on defaults specified in
* this class.
*
* @param maxSize maximum size of cache, in bytes
* @param blockSize approximate size of each block, in bytes
*/
public LruBlockCache(long maxSize, long blockSize) {
this(maxSize, blockSize, true);
}
/**
* Constructor used for testing. Allows disabling of the eviction thread.
*/
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
this(maxSize, blockSize, evictionThread,
(int) Math.ceil(1.2 * maxSize / blockSize),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
DEFAULT_SINGLE_FACTOR,
DEFAULT_MULTI_FACTOR,
DEFAULT_MEMORY_FACTOR,
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
false,
DEFAULT_MAX_BLOCK_SIZE);
}
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
this(maxSize, blockSize, evictionThread,
(int) Math.ceil(1.2 * maxSize / blockSize),
DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL,
conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
}
public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
this(maxSize, blockSize, true, conf);
}
/**
* Configurable constructor. Use this constructor if not using defaults.
*
* @param maxSize maximum size of this cache, in bytes
* @param blockSize expected average size of blocks, in bytes
* @param evictionThread whether to run evictions in a bg thread or not
* @param mapInitialSize initial size of backing ConcurrentHashMap
* @param mapLoadFactor initial load factor of backing ConcurrentHashMap
* @param mapConcurrencyLevel initial concurrency factor for backing CHM
* @param minFactor percentage of total size that eviction will evict until
* @param acceptableFactor percentage of total size that triggers eviction
* @param singleFactor percentage of total size for single-access blocks
* @param multiFactor percentage of total size for multiple-access blocks
* @param memoryFactor percentage of total size for in-memory blocks
*/
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
float minFactor, float acceptableFactor, float singleFactor,
float multiFactor, float memoryFactor, float hardLimitFactor,
boolean forceInMemory, long maxBlockSize) {
this.maxBlockSize = maxBlockSize;
if(singleFactor + multiFactor + memoryFactor != 1 ||
singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
throw new IllegalArgumentException("Single, multi, and memory factors " +
" should be non-negative and total 1.0");
}
if (minFactor >= acceptableFactor) {
throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
}
if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
throw new IllegalArgumentException("all factors must be < 1");
}
this.maxSize = maxSize;
this.blockSize = blockSize;
this.forceInMemory = forceInMemory;
map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
this.minFactor = minFactor;
this.acceptableFactor = acceptableFactor;
this.singleFactor = singleFactor;
this.multiFactor = multiFactor;
this.memoryFactor = memoryFactor;
this.stats = new CacheStats(this.getClass().getSimpleName());
this.count = new AtomicLong(0);
this.elements = new AtomicLong(0);
this.dataBlockElements = new LongAdder();
this.dataBlockSize = new LongAdder();
this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
this.size = new AtomicLong(this.overhead);
this.hardCapacityLimitFactor = hardLimitFactor;
if (evictionThread) {
this.evictionThread = new EvictionThread(this);
this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
} else {
this.evictionThread = null;
}
// TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
// every five minutes.
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
}
@Override
public void setVictimCache(BlockCache victimCache) {
if (victimHandler != null) {
throw new IllegalArgumentException("The victim cache has already been set");
}
victimHandler = requireNonNull(victimCache);
}
@Override
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
if (this.size.get() > acceptableSize() && !evictionInProgress) {
runEviction();
}
}
/**
* The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
* access will be more faster then off-heap, the small index block or meta block cached in
* CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
* calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
* heap size will be messed up. Here we will clone the block into an heap block if it's an
* off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
* the block (HBASE-22127): <br>
* 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
* 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
* reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
* JVM, so need a retain here.
* @param buf the original block
* @return an block with an heap memory backend.
*/
private Cacheable asReferencedHeapBlock(Cacheable buf) {
if (buf instanceof HFileBlock) {
HFileBlock blk = ((HFileBlock) buf);
if (blk.isSharedMem()) {
return HFileBlock.deepCloneOnHeap(blk);
}
}
// The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
return buf.retain();
}
// BlockCache implementation
/**
* Cache the block with the specified name and buffer.
* <p>
* It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
* this can happen, for which we compare the buffer contents.
*
* @param cacheKey block's cache key
* @param buf block buffer
* @param inMemory if block is in-memory
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
if (buf.heapSize() > maxBlockSize) {
// If there are a lot of blocks that are too
// big this can make the logs way too noisy.
// So we log 2%
if (stats.failInsert() % 50 == 0) {
LOG.warn("Trying to cache too large a block "
+ cacheKey.getHfileName() + " @ "
+ cacheKey.getOffset()
+ " is " + buf.heapSize()
+ " which is larger than " + maxBlockSize);
}
return;
}
LruCachedBlock cb = map.get(cacheKey);
if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
return;
}
long currentSize = size.get();
long currentAcceptableSize = acceptableSize();
long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
if (currentSize >= hardLimitSize) {
stats.failInsert();
if (LOG.isTraceEnabled()) {
LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
+ " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
+ " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
+ ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
}
if (!evictionInProgress) {
runEviction();
}
return;
}
// Ensure that the block is an heap one.
buf = asReferencedHeapBlock(buf);
cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
long newSize = updateSizeMetrics(cb, false);
map.put(cacheKey, cb);
long val = elements.incrementAndGet();
if (buf.getBlockType().isData()) {
dataBlockElements.increment();
}
if (LOG.isTraceEnabled()) {
long size = map.size();
assertCounterSanity(size, val);
}
if (newSize > currentAcceptableSize && !evictionInProgress) {
runEviction();
}
}
/**
* Sanity-checking for parity between actual block cache content and metrics.
* Intended only for use with TRACE level logging and -ea JVM.
*/
private static void assertCounterSanity(long mapSize, long counterVal) {
if (counterVal < 0) {
LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
", mapSize=" + mapSize);
return;
}
if (mapSize < Integer.MAX_VALUE) {
double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
if (pct_diff > 0.05) {
LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
", mapSize=" + mapSize);
}
}
}
/**
* Cache the block with the specified name and buffer.
* <p>
* TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
* sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
* switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
* otherwise the caching size is based on off-heap.
* @param cacheKey block's cache key
* @param buf block buffer
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
cacheBlock(cacheKey, buf, false);
}
/**
* Helper function that updates the local size counter and also updates any
* per-cf or per-blocktype metrics it can discern from given
* {@link LruCachedBlock}
*/
private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
long heapsize = cb.heapSize();
BlockType bt = cb.getBuffer().getBlockType();
if (evict) {
heapsize *= -1;
}
if (bt != null && bt.isData()) {
dataBlockSize.add(heapsize);
}
return size.addAndGet(heapsize);
}
/**
* Get the buffer of the block with the specified name.
*
* @param cacheKey block's cache key
* @param caching true if the caller caches blocks on cache misses
* @param repeat Whether this is a repeat lookup for the same block
* (used to avoid double counting cache misses when doing double-check
* locking)
* @param updateCacheMetrics Whether to update cache metrics or not
*
* @return buffer of specified cache key, or null if not in cache
*/
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
// It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
// this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
// the block and release, then we're retaining a block with refCnt=0 which is disallowed.
// see HBASE-22422.
val.getBuffer().retain();
return val;
});
if (cb == null) {
if (!repeat && updateCacheMetrics) {
stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
}
// If there is another block cache then try and read there.
// However if this is a retry ( second time in double checked locking )
// And it's already a miss then the l2 will also be a miss.
if (victimHandler != null && !repeat) {
// The handler will increase result's refCnt for RPC, so need no extra retain.
Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
// Promote this to L1.
if (result != null) {
if (caching) {
cacheBlock(cacheKey, result, /* inMemory = */ false);
}
}
return result;
}
return null;
}
if (updateCacheMetrics) {
stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
}
cb.access(count.incrementAndGet());
return cb.getBuffer();
}
/**
* Whether the cache contains block with specified cacheKey
*
* @return true if contains the block
*/
@Override
public boolean containsBlock(BlockCacheKey cacheKey) {
return map.containsKey(cacheKey);
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
LruCachedBlock cb = map.get(cacheKey);
return cb != null && evictBlock(cb, false) > 0;
}
/**
* Evicts all blocks for a specific HFile. This is an
* expensive operation implemented as a linear-time search through all blocks
* in the cache. Ideally this should be a search in a log-access-time map.
*
* <p>
* This is used for evict-on-close to remove all blocks of a specific HFile.
*
* @return the number of blocks evicted
*/
@Override
public int evictBlocksByHfileName(String hfileName) {
int numEvicted = 0;
for (BlockCacheKey key : map.keySet()) {
if (key.getHfileName().equals(hfileName)) {
if (evictBlock(key)) {
++numEvicted;
}
}
}
if (victimHandler != null) {
numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
}
return numEvicted;
}
/**
* Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
* block may be read again later
*
* @param evictedByEvictionProcess true if the given block is evicted by
* EvictionThread
* @return the heap size of evicted block
*/
protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
LruCachedBlock previous = map.remove(block.getCacheKey());
if (previous == null) {
return 0;
}
updateSizeMetrics(block, true);
long val = elements.decrementAndGet();
if (LOG.isTraceEnabled()) {
long size = map.size();
assertCounterSanity(size, val);
}
if (block.getBuffer().getBlockType().isData()) {
dataBlockElements.decrement();
}
if (evictedByEvictionProcess) {
// When the eviction of the block happened because of invalidation of HFiles, no need to
// update the stats counter.
stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
if (victimHandler != null) {
victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
}
}
// Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
// NOT move this up because if do that then the victimHandler may access the buffer with
// refCnt = 0 which is disallowed.
previous.getBuffer().release();
return block.heapSize();
}
/**
* Multi-threaded call to run the eviction process.
*/
private void runEviction() {
if (evictionThread == null) {
evict();
} else {
evictionThread.evict();
}
}
boolean isEvictionInProgress() {
return evictionInProgress;
}
long getOverhead() {
return overhead;
}
/**
* Eviction method.
*/
void evict() {
// Ensure only one eviction at a time
if (!evictionLock.tryLock()) {
return;
}
try {
evictionInProgress = true;
long currentSize = this.size.get();
long bytesToFree = currentSize - minSize();
if (LOG.isTraceEnabled()) {
LOG.trace("Block cache LRU eviction started; Attempting to free " +
StringUtils.byteDesc(bytesToFree) + " of total=" +
StringUtils.byteDesc(currentSize));
}
if (bytesToFree <= 0) {
return;
}
// Instantiate priority buckets
BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
// Scan entire map putting into appropriate buckets
for (LruCachedBlock cachedBlock : map.values()) {
switch (cachedBlock.getPriority()) {
case SINGLE: {
bucketSingle.add(cachedBlock);
break;
}
case MULTI: {
bucketMulti.add(cachedBlock);
break;
}
case MEMORY: {
bucketMemory.add(cachedBlock);
break;
}
}
}
long bytesFreed = 0;
if (forceInMemory || memoryFactor > 0.999f) {
long s = bucketSingle.totalSize();
long m = bucketMulti.totalSize();
if (bytesToFree > (s + m)) {
// this means we need to evict blocks in memory bucket to make room,
// so the single and multi buckets will be emptied
bytesFreed = bucketSingle.free(s);
bytesFreed += bucketMulti.free(m);
if (LOG.isTraceEnabled()) {
LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
" from single and multi buckets");
}
bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
if (LOG.isTraceEnabled()) {
LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
" total from all three buckets ");
}
} else {
// this means no need to evict block in memory bucket,
// and we try best to make the ratio between single-bucket and
// multi-bucket is 1:2
long bytesRemain = s + m - bytesToFree;
if (3 * s <= bytesRemain) {
// single-bucket is small enough that no eviction happens for it
// hence all eviction goes from multi-bucket
bytesFreed = bucketMulti.free(bytesToFree);
} else if (3 * m <= 2 * bytesRemain) {
// multi-bucket is small enough that no eviction happens for it
// hence all eviction goes from single-bucket
bytesFreed = bucketSingle.free(bytesToFree);
} else {
// both buckets need to evict some blocks
bytesFreed = bucketSingle.free(s - bytesRemain / 3);
if (bytesFreed < bytesToFree) {
bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
}
}
}
} else {
PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
bucketQueue.add(bucketSingle);
bucketQueue.add(bucketMulti);
bucketQueue.add(bucketMemory);
int remainingBuckets = bucketQueue.size();
BlockBucket bucket;
while ((bucket = bucketQueue.poll()) != null) {
long overflow = bucket.overflow();
if (overflow > 0) {
long bucketBytesToFree =
Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
bytesFreed += bucket.free(bucketBytesToFree);
}
remainingBuckets--;
}
}
if (LOG.isTraceEnabled()) {
long single = bucketSingle.totalSize();
long multi = bucketMulti.totalSize();
long memory = bucketMemory.totalSize();
LOG.trace("Block cache LRU eviction completed; " +
"freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
"total=" + StringUtils.byteDesc(this.size.get()) + ", " +
"single=" + StringUtils.byteDesc(single) + ", " +
"multi=" + StringUtils.byteDesc(multi) + ", " +
"memory=" + StringUtils.byteDesc(memory));
}
} finally {
stats.evict();
evictionInProgress = false;
evictionLock.unlock();
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("blockCount", getBlockCount())
.add("currentSize", StringUtils.byteDesc(getCurrentSize()))
.add("freeSize", StringUtils.byteDesc(getFreeSize()))
.add("maxSize", StringUtils.byteDesc(getMaxSize()))
.add("heapSize", StringUtils.byteDesc(heapSize()))
.add("minSize", StringUtils.byteDesc(minSize()))
.add("minFactor", minFactor)
.add("multiSize", StringUtils.byteDesc(multiSize()))
.add("multiFactor", multiFactor)
.add("singleSize", StringUtils.byteDesc(singleSize()))
.add("singleFactor", singleFactor)
.toString();
}
/**
* Used to group blocks into priority buckets. There will be a BlockBucket
* for each priority (single, multi, memory). Once bucketed, the eviction
* algorithm takes the appropriate number of elements out of each according
* to configuration parameters and their relatives sizes.
*/
private class BlockBucket implements Comparable<BlockBucket> {
private final String name;
private LruCachedBlockQueue queue;
private long totalSize = 0;
private long bucketSize;
public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
this.name = name;
this.bucketSize = bucketSize;
queue = new LruCachedBlockQueue(bytesToFree, blockSize);
totalSize = 0;
}
public void add(LruCachedBlock block) {
totalSize += block.heapSize();
queue.add(block);
}
public long free(long toFree) {
if (LOG.isTraceEnabled()) {
LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
}
LruCachedBlock cb;
long freedBytes = 0;
while ((cb = queue.pollLast()) != null) {
freedBytes += evictBlock(cb, true);
if (freedBytes >= toFree) {
return freedBytes;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
}
return freedBytes;
}
public long overflow() {
return totalSize - bucketSize;
}
public long totalSize() {
return totalSize;
}
@Override
public int compareTo(BlockBucket that) {
return Long.compare(this.overflow(), that.overflow());
}
@Override
public boolean equals(Object that) {
if (that == null || !(that instanceof BlockBucket)) {
return false;
}
return compareTo((BlockBucket)that) == 0;
}
@Override
public int hashCode() {
return Objects.hashCode(name, bucketSize, queue, totalSize);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", name)
.add("totalSize", StringUtils.byteDesc(totalSize))
.add("bucketSize", StringUtils.byteDesc(bucketSize))
.toString();
}
}
/**
* Get the maximum size of this cache.
*
* @return max size in bytes
*/
@Override
public long getMaxSize() {
return this.maxSize;
}
@Override
public long getCurrentSize() {
return this.size.get();
}
@Override
public long getCurrentDataSize() {
return this.dataBlockSize.sum();
}
@Override
public long getFreeSize() {
return getMaxSize() - getCurrentSize();
}
@Override
public long size() {
return getMaxSize();
}
@Override
public long getBlockCount() {
return this.elements.get();
}
@Override
public long getDataBlockCount() {
return this.dataBlockElements.sum();
}
EvictionThread getEvictionThread() {
return this.evictionThread;
}
/*
* Eviction thread. Sits in waiting state until an eviction is triggered
* when the cache size grows above the acceptable level.<p>
*
* Thread is triggered into action by {@link LruBlockCache#runEviction()}
*/
static class EvictionThread extends Thread {
private WeakReference<LruBlockCache> cache;
private volatile boolean go = true;
// flag set after enter the run method, used for test
private boolean enteringRun = false;
public EvictionThread(LruBlockCache cache) {
super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
setDaemon(true);
this.cache = new WeakReference<>(cache);
}
@Override
public void run() {
enteringRun = true;
while (this.go) {
synchronized (this) {
try {
this.wait(1000 * 10/*Don't wait for ever*/);
} catch (InterruptedException e) {
LOG.warn("Interrupted eviction thread ", e);
Thread.currentThread().interrupt();
}
}
LruBlockCache cache = this.cache.get();
if (cache == null) {
break;
}
cache.evict();
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="This is what we want")
public void evict() {
synchronized (this) {
this.notifyAll();
}
}
synchronized void shutdown() {
this.go = false;
this.notifyAll();
}
/**
* Used for the test.
*/
boolean isEnteringRun() {
return this.enteringRun;
}
}
/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
static class StatisticsThread extends Thread {
private final LruBlockCache lru;
public StatisticsThread(LruBlockCache lru) {
super("LruBlockCacheStats");
setDaemon(true);
this.lru = lru;
}
@Override
public void run() {
lru.logStats();
}
}
public void logStats() {
// Log size
long totalSize = heapSize();
long freeSize = maxSize - totalSize;
LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
"freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
"max=" + StringUtils.byteDesc(this.maxSize) + ", " +
"blockCount=" + getBlockCount() + ", " +
"accesses=" + stats.getRequestCount() + ", " +
"hits=" + stats.getHitCount() + ", " +
"hitRatio=" + (stats.getHitCount() == 0 ?
"0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
"cachingAccesses=" + stats.getRequestCachingCount() + ", " +
"cachingHits=" + stats.getHitCachingCount() + ", " +
"cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
"0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
"evictions=" + stats.getEvictionCount() + ", " +
"evicted=" + stats.getEvictedCount() + ", " +
"evictedPerRun=" + stats.evictedPerEviction());
}
/**
* Get counter statistics for this cache.
*
* <p>Includes: total accesses, hits, misses, evicted blocks, and runs
* of the eviction processes.
*/
@Override
public CacheStats getStats() {
return this.stats;
}
public final static long CACHE_FIXED_OVERHEAD =
ClassSize.estimateBase(LruBlockCache.class, false);
@Override
public long heapSize() {
return getCurrentSize();
}
private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
// FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
+ ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
+ ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
}
@Override
public Iterator<CachedBlock> iterator() {
final Iterator<LruCachedBlock> iterator = map.values().iterator();
return new Iterator<CachedBlock>() {
private final long now = System.nanoTime();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public CachedBlock next() {
final LruCachedBlock b = iterator.next();
return new CachedBlock() {
@Override
public String toString() {
return BlockCacheUtil.toString(this, now);
}
@Override
public BlockPriority getBlockPriority() {
return b.getPriority();
}
@Override
public BlockType getBlockType() {
return b.getBuffer().getBlockType();
}
@Override
public long getOffset() {
return b.getCacheKey().getOffset();
}
@Override
public long getSize() {
return b.getBuffer().heapSize();
}
@Override
public long getCachedTime() {
return b.getCachedTime();
}
@Override
public String getFilename() {
return b.getCacheKey().getHfileName();
}
@Override
public int compareTo(CachedBlock other) {
int diff = this.getFilename().compareTo(other.getFilename());
if (diff != 0) {
return diff;
}
diff = Long.compare(this.getOffset(), other.getOffset());
if (diff != 0) {
return diff;
}
if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
}
return Long.compare(other.getCachedTime(), this.getCachedTime());
}
@Override
public int hashCode() {
return b.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof CachedBlock) {
CachedBlock cb = (CachedBlock)obj;
return compareTo(cb) == 0;
} else {
return false;
}
}
};
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
// Simple calculators of sizes given factors and maxSize
long acceptableSize() {
return (long)Math.floor(this.maxSize * this.acceptableFactor);
}
private long minSize() {
return (long)Math.floor(this.maxSize * this.minFactor);
}
private long singleSize() {
return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
}
private long multiSize() {
return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
}
private long memorySize() {
return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
}
@Override
public void shutdown() {
if (victimHandler != null) {
victimHandler.shutdown();
}
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.warn("Interrupted while sleeping");
Thread.currentThread().interrupt();
break;
}
}
}
if (!this.scheduleThreadPool.isShutdown()) {
List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
LOG.debug("Still running " + runnables);
}
this.evictionThread.shutdown();
}
/** Clears the cache. Used in tests. */
public void clearCache() {
this.map.clear();
this.elements.set(0);
}
/**
* Used in testing. May be very inefficient.
*
* @return the set of cached file names
*/
SortedSet<String> getCachedFileNamesForTest() {
SortedSet<String> fileNames = new TreeSet<>();
for (BlockCacheKey cacheKey : map.keySet()) {
fileNames.add(cacheKey.getHfileName());
}
return fileNames;
}
public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
for (LruCachedBlock block : map.values()) {
DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
Integer count = counts.get(encoding);
counts.put(encoding, (count == null ? 0 : count) + 1);
}
return counts;
}
Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
return map;
}
@Override
public BlockCache[] getBlockCaches() {
if (victimHandler != null) {
return new BlockCache[] { this, this.victimHandler };
}
return null;
}
}