blob: a0dc30c524237514fffb8284c1a60073e198476f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static java.util.Objects.requireNonNull;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Policy.Eviction;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU
* eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide
* O(1) read and write operations.
* <ul>
* <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
* <li>Caffeine: https://github.com/ben-manes/caffeine</li>
* <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
* </ul>
*/
@InterfaceAudience.Private
public final class TinyLfuBlockCache implements FirstLevelBlockCache {
private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCache.class);
private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size";
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60;
private final Eviction<BlockCacheKey, Cacheable> policy;
private final ScheduledExecutorService statsThreadPool;
private final long maxBlockSize;
private final CacheStats stats;
private BlockCache victimCache;
final Cache<BlockCacheKey, Cacheable> cache;
/**
* Creates a block cache.
*
* @param maximumSizeInBytes maximum size of this cache, in bytes
* @param avgBlockSize expected average size of blocks, in bytes
* @param executor the cache's executor
* @param conf additional configuration
*/
public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize,
Executor executor, Configuration conf) {
this(maximumSizeInBytes, avgBlockSize,
conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor);
}
/**
* Creates a block cache.
*
* @param maximumSizeInBytes maximum size of this cache, in bytes
* @param avgBlockSize expected average size of blocks, in bytes
* @param maxBlockSize maximum size of a block, in bytes
* @param executor the cache's executor
*/
public TinyLfuBlockCache(long maximumSizeInBytes,
long avgBlockSize, long maxBlockSize, Executor executor) {
this.cache = Caffeine.newBuilder()
.executor(executor)
.maximumWeight(maximumSizeInBytes)
.removalListener(new EvictionListener())
.weigher((BlockCacheKey key, Cacheable value) ->
(int) Math.min(value.heapSize(), Integer.MAX_VALUE))
.initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize))
.build();
this.maxBlockSize = maxBlockSize;
this.policy = cache.policy().eviction().get();
this.stats = new CacheStats(getClass().getSimpleName());
statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build());
statsThreadPool.scheduleAtFixedRate(this::logStats,
STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS);
}
@Override
public void setVictimCache(BlockCache victimCache) {
if (this.victimCache != null) {
throw new IllegalArgumentException("The victim cache has already been set");
}
this.victimCache = requireNonNull(victimCache);
}
@Override
public long size() {
return policy.getMaximum();
}
@Override
public long getFreeSize() {
return size() - getCurrentSize();
}
@Override
public long getCurrentSize() {
return policy.weightedSize().getAsLong();
}
@Override
public long getBlockCount() {
return cache.estimatedSize();
}
@Override
public long heapSize() {
return getCurrentSize();
}
@Override
public void setMaxSize(long size) {
policy.setMaximum(size);
}
@Override
public boolean containsBlock(BlockCacheKey cacheKey) {
return cache.asMap().containsKey(cacheKey);
}
@Override
public Cacheable getBlock(BlockCacheKey cacheKey,
boolean caching, boolean repeat, boolean updateCacheMetrics) {
Cacheable value = cache.getIfPresent(cacheKey);
if (value == null) {
if (repeat) {
return null;
}
if (updateCacheMetrics) {
stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
}
if (victimCache != null) {
value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
if ((value != null) && caching) {
if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) {
value = HFileBlock.deepCloneOnHeap((HFileBlock) value);
}
cacheBlock(cacheKey, value);
}
}
} else if (updateCacheMetrics) {
stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
}
return value;
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory) {
cacheBlock(cacheKey, value);
}
@Override
public void cacheBlock(BlockCacheKey key, Cacheable value) {
if (value.heapSize() > maxBlockSize) {
// If there are a lot of blocks that are too big this can make the logs too noisy (2% logged)
if (stats.failInsert() % 50 == 0) {
LOG.warn(String.format(
"Trying to cache too large a block %s @ %,d is %,d which is larger than %,d",
key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE));
}
} else {
cache.put(key, value);
}
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
Cacheable value = cache.asMap().remove(cacheKey);
return (value != null);
}
@Override
public int evictBlocksByHfileName(String hfileName) {
int evicted = 0;
for (BlockCacheKey key : cache.asMap().keySet()) {
if (key.getHfileName().equals(hfileName) && evictBlock(key)) {
evicted++;
}
}
if (victimCache != null) {
evicted += victimCache.evictBlocksByHfileName(hfileName);
}
return evicted;
}
@Override
public CacheStats getStats() {
return stats;
}
@Override
public void shutdown() {
if (victimCache != null) {
victimCache.shutdown();
}
statsThreadPool.shutdown();
}
@Override
public BlockCache[] getBlockCaches() {
return null;
}
@Override
public Iterator<CachedBlock> iterator() {
long now = System.nanoTime();
return cache.asMap().entrySet().stream()
.map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now))
.iterator();
}
private void logStats() {
LOG.info(
"totalSize=" + StringUtils.byteDesc(heapSize()) + ", " +
"freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " +
"max=" + StringUtils.byteDesc(size()) + ", " +
"blockCount=" + getBlockCount() + ", " +
"accesses=" + stats.getRequestCount() + ", " +
"hits=" + stats.getHitCount() + ", " +
"hitRatio=" + (stats.getHitCount() == 0 ?
"0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") +
"cachingAccesses=" + stats.getRequestCachingCount() + ", " +
"cachingHits=" + stats.getHitCachingCount() + ", " +
"cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
"0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
"evictions=" + stats.getEvictionCount() + ", " +
"evicted=" + stats.getEvictedCount());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("blockCount", getBlockCount())
.add("currentSize", getCurrentSize())
.add("freeSize", getFreeSize())
.add("maxSize", size())
.add("heapSize", heapSize())
.add("victimCache", (victimCache != null))
.toString();
}
/** A removal listener to asynchronously record evictions and populate the victim cache. */
private final class EvictionListener implements RemovalListener<BlockCacheKey, Cacheable> {
@Override
public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) {
if (!cause.wasEvicted()) {
// An explicit eviction (invalidation) is not added to the victim cache as the data may
// no longer be valid for subsequent queries.
return;
}
recordEviction();
if (victimCache == null) {
return;
} else if (victimCache instanceof BucketCache) {
BucketCache victimBucketCache = (BucketCache) victimCache;
victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true);
} else {
victimCache.cacheBlock(key, value);
}
}
}
/**
* Records an eviction. The number of eviction operations and evicted blocks are identical, as
* an eviction is triggered immediately when the capacity has been exceeded. An eviction is
* performed asynchronously. See the library's documentation for details on write buffers,
* batching, and maintenance behavior.
*/
private void recordEviction() {
// FIXME: Currently does not capture the insertion time
stats.evicted(Long.MAX_VALUE, true);
stats.evict();
}
private static final class CachedBlockView implements CachedBlock {
private static final Comparator<CachedBlock> COMPARATOR = Comparator
.comparing(CachedBlock::getFilename)
.thenComparing(CachedBlock::getOffset)
.thenComparing(CachedBlock::getCachedTime);
private final BlockCacheKey key;
private final Cacheable value;
private final long now;
public CachedBlockView(BlockCacheKey key, Cacheable value, long now) {
this.now = now;
this.key = key;
this.value = value;
}
@Override
public BlockPriority getBlockPriority() {
// This does not appear to be used in any meaningful way and is irrelevant to this cache
return BlockPriority.MEMORY;
}
@Override
public BlockType getBlockType() {
return value.getBlockType();
}
@Override
public long getOffset() {
return key.getOffset();
}
@Override
public long getSize() {
return value.heapSize();
}
@Override
public long getCachedTime() {
// This does not appear to be used in any meaningful way, so not captured
return 0L;
}
@Override
public String getFilename() {
return key.getHfileName();
}
@Override
public int compareTo(CachedBlock other) {
return COMPARATOR.compare(this, other);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (!(obj instanceof CachedBlock)) {
return false;
}
CachedBlock other = (CachedBlock) obj;
return compareTo(other) == 0;
}
@Override
public int hashCode() {
return key.hashCode();
}
@Override
public String toString() {
return BlockCacheUtil.toString(this, now);
}
}
@Override
public long getMaxSize() {
return size();
}
@Override
public long getCurrentDataSize() {
return getCurrentSize();
}
@Override
public long getDataBlockCount() {
return getBlockCount();
}
}