blob: f18248669ba16a36d5ad7243a7f7eeffc2981391 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.search;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.solr.common.SolrException;
import org.apache.solr.metrics.MetricsMap;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.util.IOFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Policy.Eviction;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.annotations.VisibleForTesting;
/**
* A SolrCache backed by the Caffeine caching library [1]. By default it uses the Window TinyLFU (W-TinyLFU)
* eviction policy.
* <p>This cache supports either maximum size limit (the number of items) or maximum ram bytes limit, but
* not both. If both values are set then only maxRamMB limit is used and maximum size limit is ignored.</p>
* <p>
* W-TinyLFU [2] is a near optimal policy that uses recency and frequency to determine which entry
* to evict in O(1) time. The estimated frequency is retained in a Count-Min Sketch and entries
* reside on LRU priority queues [3]. By capturing the historic frequency of an entry, the cache is
* able to outperform classic policies like LRU and LFU, as well as modern policies like ARC and
* LIRS. This policy performed particularly well in search workloads.
* <p>
* [1] https://github.com/ben-manes/caffeine
* [2] http://arxiv.org/pdf/1512.00727.pdf
* [3] http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html
*/
public class CaffeineCache<K, V> extends SolrCacheBase implements SolrCache<K, V>, Accountable, RemovalListener<K, V> {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CaffeineCache.class)
+ RamUsageEstimator.shallowSizeOfInstance(CacheStats.class)
+ 2 * RamUsageEstimator.shallowSizeOfInstance(LongAdder.class);
private static final long RAM_BYTES_PER_FUTURE = RamUsageEstimator.shallowSizeOfInstance(CompletableFuture.class);
private Executor executor;
private CacheStats priorStats;
private long priorHits;
private long priorInserts;
private long priorLookups;
private String description = "Caffeine Cache";
private LongAdder hits;
private LongAdder inserts;
private LongAdder lookups;
private Cache<K,V> cache;
private AsyncCache<K,V> asyncCache;
private long warmupTime;
private int maxSize;
private long maxRamBytes;
private int initialSize;
private int maxIdleTimeSec;
private boolean cleanupThread;
private boolean async;
private Set<String> metricNames = ConcurrentHashMap.newKeySet();
private MetricsMap cacheMap;
private SolrMetricsContext solrMetricsContext;
private long initialRamBytes = 0;
private final LongAdder ramBytes = new LongAdder();
public CaffeineCache() {
this.priorStats = CacheStats.empty();
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Object init(Map args, Object persistence, CacheRegenerator regenerator) {
super.init(args, regenerator);
String str = (String) args.get(SIZE_PARAM);
maxSize = (str == null) ? 1024 : Integer.parseInt(str);
str = (String) args.get(INITIAL_SIZE_PARAM);
initialSize = Math.min((str == null) ? 1024 : Integer.parseInt(str), maxSize);
str = (String) args.get(MAX_IDLE_TIME_PARAM);
if (str == null) {
maxIdleTimeSec = -1;
} else {
maxIdleTimeSec = Integer.parseInt(str);
}
str = (String) args.get(MAX_RAM_MB_PARAM);
int maxRamMB = str == null ? -1 : Double.valueOf(str).intValue();
maxRamBytes = maxRamMB < 0 ? Long.MAX_VALUE : maxRamMB * 1024L * 1024L;
cleanupThread = Boolean.parseBoolean((String) args.get(CLEANUP_THREAD_PARAM));
async = Boolean.parseBoolean((String) args.getOrDefault(ASYNC_PARAM, "true"));
if (async) {
// We record futures in the map to decrease bucket-lock contention, but need computation handled in same thread
executor = Runnable::run;
} else if (cleanupThread) {
executor = ForkJoinPool.commonPool();
} else {
executor = Runnable::run;
}
description = generateDescription(maxSize, initialSize);
cache = buildCache(null);
hits = new LongAdder();
inserts = new LongAdder();
lookups = new LongAdder();
initialRamBytes =
RamUsageEstimator.shallowSizeOfInstance(cache.getClass()) +
RamUsageEstimator.shallowSizeOfInstance(executor.getClass()) +
RamUsageEstimator.sizeOfObject(description);
return persistence;
}
@SuppressWarnings({"unchecked"})
private Cache<K, V> buildCache(Cache<K, V> prev) {
@SuppressWarnings({"rawtypes"})
Caffeine builder = Caffeine.newBuilder()
.initialCapacity(initialSize)
.executor(executor)
.removalListener(this)
.recordStats();
if (maxIdleTimeSec > 0) {
builder.expireAfterAccess(Duration.ofSeconds(maxIdleTimeSec));
}
if (maxRamBytes != Long.MAX_VALUE) {
builder.maximumWeight(maxRamBytes);
builder.weigher((k, v) -> (int) (RamUsageEstimator.sizeOfObject(k) + RamUsageEstimator.sizeOfObject(v)));
} else {
builder.maximumSize(maxSize);
}
Cache<K, V> newCache;
if (async) {
asyncCache = builder.buildAsync();
newCache = asyncCache.synchronous();
} else {
newCache = builder.build();
}
if (prev != null) {
newCache.putAll(prev.asMap());
}
return newCache;
}
@Override
public void onRemoval(K key, V value, RemovalCause cause) {
ramBytes.add(
- (RamUsageEstimator.sizeOfObject(key, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED) +
RamUsageEstimator.sizeOfObject(value, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED) +
RamUsageEstimator.LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY)
);
}
@Override
public long ramBytesUsed() {
return BASE_RAM_BYTES_USED + initialRamBytes + ramBytes.sum();
}
@Override
public V get(K key) {
return cache.getIfPresent(key);
}
private V computeAsync(K key, IOFunction<? super K, ? extends V> mappingFunction) throws IOException {
CompletableFuture<V> future = new CompletableFuture<>();
CompletableFuture<V> result = asyncCache.asMap().putIfAbsent(key, future);
lookups.increment();
if (result != null) {
try {
// Another thread is already working on this computation, wait for them to finish
V value = result.join();
hits.increment();
return value;
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
// Computation had an IOException, likely index problems, so fail this result too
throw (IOException) cause;
}
throw e;
}
}
try {
// We reserved the slot, so we do the work
V value = mappingFunction.apply(key);
future.complete(value); // This will update the weight and expiration
recordRamBytes(key, null, value);
inserts.increment();
return value;
} catch (Error | RuntimeException | IOException e) {
// TimeExceeded exception is runtime and will bubble up from here
future.completeExceptionally(e);
throw e;
}
}
@Override
public V computeIfAbsent(K key, IOFunction<? super K, ? extends V> mappingFunction) throws IOException {
if (async) {
return computeAsync(key, mappingFunction);
}
try {
return cache.get(key, k -> {
V value;
try {
value = mappingFunction.apply(k);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
if (value == null) {
return null;
}
recordRamBytes(key, null, value);
inserts.increment();
return value;
});
} catch (UncheckedIOException e) {
throw e.getCause();
}
}
@Override
public V put(K key, V val) {
inserts.increment();
V old = cache.asMap().put(key, val);
recordRamBytes(key, old, val);
return old;
}
private void recordRamBytes(K key, V oldValue, V newValue) {
ramBytes.add(RamUsageEstimator.sizeOfObject(key, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED) +
RamUsageEstimator.sizeOfObject(newValue, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
if (oldValue == null) {
ramBytes.add(RamUsageEstimator.LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY);
if (async) ramBytes.add(RAM_BYTES_PER_FUTURE);
} else {
ramBytes.add(- RamUsageEstimator.sizeOfObject(oldValue, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
}
}
@Override
public V remove(K key) {
V existing = cache.asMap().remove(key);
if (existing != null) {
ramBytes.add(- RamUsageEstimator.sizeOfObject(key, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
ramBytes.add(- RamUsageEstimator.sizeOfObject(existing, RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_USED));
ramBytes.add(- RamUsageEstimator.LINKED_HASHTABLE_RAM_BYTES_PER_ENTRY);
}
return existing;
}
@Override
public void clear() {
cache.invalidateAll();
ramBytes.reset();
}
@Override
public int size() {
return cache.asMap().size();
}
@Override
public void close() throws IOException {
SolrCache.super.close();
cache.invalidateAll();
cache.cleanUp();
if (executor instanceof ExecutorService) {
((ExecutorService)executor).shutdownNow();
}
ramBytes.reset();
}
@Override
public int getMaxSize() {
return maxSize;
}
@Override
public void setMaxSize(int maxSize) {
if (this.maxSize == maxSize) {
return;
}
Optional<Eviction<K, V>> evictionOpt = cache.policy().eviction();
if (evictionOpt.isPresent()) {
Eviction<K, V> eviction = evictionOpt.get();
eviction.setMaximum(maxSize);
this.maxSize = maxSize;
initialSize = Math.min(1024, this.maxSize);
description = generateDescription(this.maxSize, initialSize);
cache.cleanUp();
}
}
@Override
public int getMaxRamMB() {
return maxRamBytes != Long.MAX_VALUE ? (int) (maxRamBytes / 1024L / 1024L) : -1;
}
@Override
public void setMaxRamMB(int maxRamMB) {
long newMaxRamBytes = maxRamMB < 0 ? Long.MAX_VALUE : maxRamMB * 1024L * 1024L;
if (newMaxRamBytes != maxRamBytes) {
maxRamBytes = newMaxRamBytes;
Optional<Eviction<K, V>> evictionOpt = cache.policy().eviction();
if (evictionOpt.isPresent()) {
Eviction<K, V> eviction = evictionOpt.get();
if (!eviction.isWeighted()) {
// rebuild cache using weigher
cache = buildCache(cache);
return;
} else if (maxRamBytes == Long.MAX_VALUE) {
// rebuild cache using maxSize
cache = buildCache(cache);
return;
}
eviction.setMaximum(newMaxRamBytes);
description = generateDescription(this.maxSize, initialSize);
cache.cleanUp();
}
}
}
@Override
public void warm(SolrIndexSearcher searcher, SolrCache<K,V> old) {
if (regenerator == null) {
return;
}
long warmingStartTime = System.nanoTime();
Map<K, V> hottest = Collections.emptyMap();
CaffeineCache<K,V> other = (CaffeineCache<K,V>)old;
// warm entries
if (isAutowarmingOn()) {
int size = autowarm.getWarmCount(other.cache.asMap().size());
hottest = other.cache.policy().eviction().map(p -> p.hottest(size)).orElse(Collections.emptyMap());
}
for (Entry<K, V> entry : hottest.entrySet()) {
try {
boolean continueRegen = regenerator.regenerateItem(
searcher, this, old, entry.getKey(), entry.getValue());
if (!continueRegen) {
break;
}
}
catch (Exception e) {
SolrException.log(log, "Error during auto-warming of key:" + entry.getKey(), e);
}
}
hits.reset();
inserts.reset();
lookups.reset();
CacheStats oldStats = other.cache.stats();
priorStats = oldStats.plus(other.priorStats);
priorHits = oldStats.hitCount() + other.hits.sum() + other.priorHits;
priorInserts = other.inserts.sum() + other.priorInserts;
priorLookups = oldStats.requestCount() + other.lookups.sum() + other.priorLookups;
warmupTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - warmingStartTime, TimeUnit.NANOSECONDS);
}
/** Returns the description of this cache. */
private String generateDescription(int limit, int initialSize) {
return String.format(Locale.ROOT, "Caffeine Cache(maxSize=%d, initialSize=%d%s)",
limit, initialSize, isAutowarmingOn() ? (", " + getAutowarmDescription()) : "");
}
@Override
public boolean isRecursionSupported() {
return async;
}
//////////////////////// SolrInfoBean methods //////////////////////
@Override
public String getName() {
return CaffeineCache.class.getName();
}
@Override
public String getDescription() {
return description;
}
// for unit tests only
@VisibleForTesting
MetricsMap getMetricsMap() {
return cacheMap;
}
@Override
public SolrMetricsContext getSolrMetricsContext() {
return solrMetricsContext;
}
@Override
public String toString() {
return name() + (cacheMap != null ? cacheMap.getValue().toString() : "");
}
@Override
public Set<String> getMetricNames() {
return metricNames;
}
@Override
public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
solrMetricsContext = parentContext.getChildContext(this);
cacheMap = new MetricsMap(map -> {
if (cache != null) {
CacheStats stats = cache.stats();
long hitCount = stats.hitCount() + hits.sum();
long insertCount = inserts.sum();
long lookupCount = stats.requestCount() + lookups.sum();
map.put(LOOKUPS_PARAM, lookupCount);
map.put(HITS_PARAM, hitCount);
map.put(HIT_RATIO_PARAM, hitRate(hitCount, lookupCount));
map.put(INSERTS_PARAM, insertCount);
map.put(EVICTIONS_PARAM, stats.evictionCount());
map.put(SIZE_PARAM, cache.asMap().size());
map.put("warmupTime", warmupTime);
map.put(RAM_BYTES_USED_PARAM, ramBytesUsed());
map.put(MAX_RAM_MB_PARAM, getMaxRamMB());
CacheStats cumulativeStats = priorStats.plus(stats);
long cumLookups = priorLookups + lookupCount;
long cumHits = priorHits + hitCount;
map.put("cumulative_lookups", cumLookups);
map.put("cumulative_hits", cumHits);
map.put("cumulative_hitratio", hitRate(cumHits, cumLookups));
map.put("cumulative_inserts", priorInserts + insertCount);
map.put("cumulative_evictions", cumulativeStats.evictionCount());
}
});
solrMetricsContext.gauge(this, cacheMap, true, scope, getCategory().toString());
}
private static double hitRate(long hitCount, long lookupCount) {
return lookupCount == 0 ? 1.0 : (double) hitCount / lookupCount;
}
}