blob: ca3a61372f02574974cd58b997b06e83769ac398 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.file.blockfile.cache.tinylfu;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Supplier;
import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCacheManager.Configuration;
import org.apache.accumulo.core.spi.cache.CacheEntry;
import org.apache.accumulo.core.spi.cache.CacheEntry.Weighable;
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Policy;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
/**
* A block cache that is memory bounded using the W-TinyLFU eviction algorithm. This implementation
* delegates to a Caffeine cache to provide concurrent O(1) read and write operations.
* <ul>
* <li><a href="https://arxiv.org/pdf/1512.00727.pdf">W-TinyLFU</a></li>
* <li><a href="https://github.com/ben-manes/caffeine">Caffeine</a></li>
* <li><a href="https://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html">Cache
* design</a></li>
* </ul>
*/
public final class TinyLfuBlockCache implements BlockCache {
private static final Logger log = LoggerFactory.getLogger(TinyLfuBlockCache.class);
private static final int STATS_PERIOD_SEC = 60;
private final Cache<String,Block> cache;
private final Policy.Eviction<String,Block> policy;
private final int maxSize;
private final ScheduledExecutorService statsExecutor = ThreadPools.getServerThreadPools()
.createScheduledExecutorService(1, "TinyLfuBlockCacheStatsExecutor");
public TinyLfuBlockCache(Configuration conf, CacheType type) {
cache = Caffeine.newBuilder()
.initialCapacity((int) Math.ceil(1.2 * conf.getMaxSize(type) / conf.getBlockSize()))
.weigher((String blockName, Block block) -> {
int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING;
return keyWeight + block.weight();
}).maximumWeight(conf.getMaxSize(type)).recordStats().build();
policy = cache.policy().eviction().orElseThrow();
maxSize = (int) Math.min(Integer.MAX_VALUE, policy.getMaximum());
ScheduledFuture<?> future = statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC,
STATS_PERIOD_SEC, SECONDS);
ThreadPools.watchNonCriticalScheduledTask(future);
}
@Override
public long getMaxHeapSize() {
return getMaxSize();
}
@Override
public long getMaxSize() {
return maxSize;
}
@Override
public CacheEntry getBlock(String blockName) {
return wrap(blockName, cache.getIfPresent(blockName));
}
@Override
public CacheEntry cacheBlock(String blockName, byte[] buffer) {
return wrap(blockName, cache.asMap().compute(blockName, (key, block) -> {
return new Block(buffer);
}));
}
@Override
public BlockCache.Stats getStats() {
CacheStats stats = cache.stats();
return new BlockCache.Stats() {
@Override
public long hitCount() {
return stats.hitCount();
}
@Override
public long requestCount() {
return stats.requestCount();
}
@Override
public long evictionCount() {
return stats.evictionCount();
}
};
}
private void logStats() {
double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024));
double sizeMB = ((double) policy.weightedSize().orElse(0)) / ((double) (1024 * 1024));
double freeMB = maxMB - sizeMB;
log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, freeMB, maxMB,
cache.estimatedSize());
log.debug(cache.stats().toString());
}
private static final class Block {
private final byte[] buffer;
private Weighable index;
private volatile int lastIndexWeight;
Block(byte[] buffer) {
this.buffer = buffer;
this.lastIndexWeight = buffer.length / 100;
}
int weight() {
int indexWeight = lastIndexWeight + SizeConstants.SIZEOF_INT + ClassSize.REFERENCE;
return indexWeight + ClassSize.align(getBuffer().length) + SizeConstants.SIZEOF_LONG
+ ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
}
public byte[] getBuffer() {
return buffer;
}
@SuppressWarnings("unchecked")
public synchronized <T extends Weighable> T getIndex(Supplier<T> supplier) {
if (index == null) {
index = supplier.get();
}
return (T) index;
}
public synchronized boolean indexWeightChanged() {
if (index != null) {
int indexWeight = index.weight();
if (indexWeight > lastIndexWeight) {
lastIndexWeight = indexWeight;
return true;
}
}
return false;
}
}
private CacheEntry wrap(String cacheKey, Block block) {
if (block != null) {
return new TlfuCacheEntry(cacheKey, block);
}
return null;
}
private class TlfuCacheEntry implements CacheEntry {
private final String cacheKey;
private final Block block;
TlfuCacheEntry(String k, Block b) {
this.cacheKey = k;
this.block = b;
}
@Override
public byte[] getBuffer() {
return block.getBuffer();
}
@Override
public <T extends Weighable> T getIndex(Supplier<T> supplier) {
return block.getIndex(supplier);
}
@Override
public void indexWeightChanged() {
if (block.indexWeightChanged()) {
// update weight
cache.put(cacheKey, block);
}
}
}
private Block load(Loader loader, Map<String,byte[]> resolvedDeps) {
byte[] data = loader.load(maxSize, resolvedDeps);
return data == null ? null : new Block(data);
}
private Map<String,byte[]> resolveDependencies(Map<String,Loader> deps) {
if (deps.size() == 1) {
Entry<String,Loader> entry = deps.entrySet().iterator().next();
CacheEntry ce = getBlock(entry.getKey(), entry.getValue());
if (ce == null) {
return null;
}
return Collections.singletonMap(entry.getKey(), ce.getBuffer());
} else {
HashMap<String,byte[]> resolvedDeps = new HashMap<>();
for (Entry<String,Loader> entry : deps.entrySet()) {
CacheEntry ce = getBlock(entry.getKey(), entry.getValue());
if (ce == null) {
return null;
}
resolvedDeps.put(entry.getKey(), ce.getBuffer());
}
return resolvedDeps;
}
}
@Override
public CacheEntry getBlock(String blockName, Loader loader) {
Map<String,Loader> deps = loader.getDependencies();
Block block;
if (deps.isEmpty()) {
block = cache.get(blockName, k -> load(loader, Collections.emptyMap()));
} else {
// This code path exist to handle the case where dependencies may need to be loaded. Loading
// dependencies will access the cache. Cache load functions
// should not access the cache.
block = cache.getIfPresent(blockName);
if (block == null) {
// Load dependencies outside of cache load function.
Map<String,byte[]> resolvedDeps = resolveDependencies(deps);
if (resolvedDeps == null) {
return null;
}
// Use asMap because it will not increment stats, getIfPresent recorded a miss above. Use
// computeIfAbsent because it is possible another thread loaded
// the data since this thread called getIfPresent.
block = cache.asMap().computeIfAbsent(blockName, k -> load(loader, resolvedDeps));
}
}
return wrap(blockName, block);
}
}