blob: 4df170f50847ee56f2204f3d5683a0a5f0682d09 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache;
import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
import static com.google.common.cache.RemovalCause.COLLECTED;
import static com.google.common.cache.RemovalCause.EXPIRED;
import static com.google.common.cache.RemovalCause.SIZE;
import static com.google.common.collect.Iterables.filter;
import static java.util.Collections.singleton;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.apache.jackrabbit.oak.cache.CacheValue;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache.GenerationCache;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.DataType;
import org.jetbrains.annotations.Nullable;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheStats;
import com.google.common.cache.RemovalCause;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class NodeCache<K extends CacheValue, V extends CacheValue>
implements Cache<K, V>, GenerationCache, EvictionListener<K, V> {
static final Logger LOG = LoggerFactory.getLogger(NodeCache.class);
private static final Set<RemovalCause> EVICTION_CAUSES = ImmutableSet.of(COLLECTED, EXPIRED, SIZE);
private final PersistentCache cache;
private final PersistentCacheStats stats;
private final Cache<K, V> memCache;
private final MultiGenerationMap<K, V> map;
private final CacheType type;
private final DataType keyType;
private final DataType valueType;
private final CacheMetadata<K> memCacheMetadata;
private final DocumentNodeStore nodeStore;
private final boolean async;
CacheWriteQueue<K, V> writeQueue;
NodeCache(
PersistentCache cache,
Cache<K, V> memCache,
DocumentNodeStore docNodeStore,
DocumentStore docStore,
CacheType type,
CacheActionDispatcher dispatcher,
StatisticsProvider statisticsProvider,
boolean async) {
this.cache = cache;
this.memCache = memCache;
this.type = type;
this.nodeStore = docNodeStore;
this.async = async;
PersistentCache.LOG.info("wrapping map " + this.type);
map = new MultiGenerationMap<K, V>();
keyType = new KeyDataType(type);
valueType = new ValueDataType(docNodeStore, docStore, type);
this.memCacheMetadata = new CacheMetadata<K>();
if (async) {
this.writeQueue = new CacheWriteQueue<K, V>(dispatcher, cache, map);
LOG.info("The persistent cache {} writes will be asynchronous", type);
} else {
this.writeQueue = null;
this.memCacheMetadata.disable();
LOG.info("The persistent cache {} writes will be synchronous", type);
}
this.stats = new PersistentCacheStats(type, statisticsProvider);
}
@Override
public CacheType getType() {
return type;
}
@Override
public void addGeneration(int generation, boolean readOnly) {
MVMap.Builder<K, V> b = new MVMap.Builder<K, V>().
keyType(keyType).valueType(valueType);
CacheMap<K, V> m = cache.openMap(generation, type.getMapName(), b);
map.addReadMap(generation, m);
if (!readOnly) {
map.setWriteMap(m);
stats.addWriteGeneration(generation);
}
}
@Override
public void removeGeneration(int generation) {
map.removeReadMap(generation);
stats.removeReadGeneration(generation);
}
private V readIfPresent(K key) {
return async ? asyncReadIfPresent(key) : syncReadIfPresent(key);
}
private V syncReadIfPresent(K key) {
cache.switchGenerationIfNeeded();
TimerStats.Context ctx = stats.startReadTimer();
V v = map.get(key);
ctx.stop();
if (v != null) {
memCacheMetadata.putFromPersistenceAndIncrement(key);
}
return v;
}
private V asyncReadIfPresent(K key) {
TimerStats.Context ctx = stats.startReadTimer();
try {
MultiGenerationMap.ValueWithGenerationInfo<V> v = map.readValue(key);
if (v == null) {
return null;
}
if (v.isCurrentGeneration() && !cache.needSwitch()) {
// don't persist again on eviction
memCacheMetadata.putFromPersistenceAndIncrement(key);
} else {
// persist again during eviction
memCacheMetadata.increment(key);
}
return v.getValue();
} finally {
ctx.stop();
}
}
private void broadcast(final K key, final V value) {
cache.broadcast(type, new Function<WriteBuffer, Void>() {
@Override
@Nullable
public Void apply(@Nullable WriteBuffer buffer) {
keyType.write(buffer, key);
if (value == null) {
buffer.put((byte) 0);
} else {
buffer.put((byte) 1);
valueType.write(buffer, value);
}
return null;
}
});
}
private void write(final K key, final V value) {
cache.switchGenerationIfNeeded();
if (value == null) {
map.remove(key);
} else {
if (!type.shouldCache(nodeStore, key)){
return;
}
map.put(key, value);
long memory = 0L;
memory += (key == null ? 0L: keyType.getMemory(key));
memory += (value == null ? 0L: valueType.getMemory(value));
stats.markBytesWritten(memory);
stats.markPut();
}
}
@SuppressWarnings("unchecked")
@Override
@Nullable
public V getIfPresent(Object key) {
memCacheMetadata.increment((K) key);
V value = memCache.getIfPresent(key);
if (value == null) {
memCacheMetadata.remove(key);
} else {
return value;
}
// do not bother the persistent cache if the
// key was excluded by configuration
if (!type.shouldCache(nodeStore, key)) {
return null;
}
stats.markRequest();
// it takes care of updating memCacheMetadata
value = readIfPresent((K) key);
if (value != null) {
memCache.put((K) key, value);
stats.markHit();
}
return value;
}
@Override
public V get(K key,
Callable<? extends V> valueLoader)
throws ExecutionException {
// Get stats covered in getIfPresent
V value = getIfPresent(key);
if (value != null) {
return value;
}
// Track entry load time
TimerStats.Context ctx = stats.startLoaderTimer();
try {
memCacheMetadata.increment(key);
value = memCache.get(key, valueLoader);
ctx.stop();
if (!async) {
write((K) key, value);
}
broadcast(key, value);
return value;
} catch (ExecutionException e) {
stats.markException();
throw e;
}
}
@Override
public ImmutableMap<K, V> getAllPresent(
Iterable<?> keys) {
Iterable<K> typedKeys = (Iterable<K>) keys;
memCacheMetadata.incrementAll(keys);
ImmutableMap<K, V> result = memCache.getAllPresent(keys);
memCacheMetadata.removeAll(filter(typedKeys, not(in(result.keySet()))));
return result;
}
@Override
public void put(K key, V value) {
memCacheMetadata.put(key);
memCache.put(key, value);
if (!async) {
write((K) key, value);
}
broadcast(key, value);
}
@SuppressWarnings("unchecked")
@Override
public void invalidate(Object key) {
memCache.invalidate(key);
memCacheMetadata.remove(key);
if (async) {
writeQueue.addInvalidate(singleton((K) key));
} else {
write((K) key, null);
}
broadcast((K) key, null);
stats.markInvalidateOne();
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
memCacheMetadata.putAll(m.keySet());
memCache.putAll(m);
}
@Override
public void invalidateAll(Iterable<?> keys) {
memCache.invalidateAll(keys);
memCacheMetadata.removeAll(keys);
}
@Override
public void invalidateAll() {
memCache.invalidateAll();
memCacheMetadata.clear();
map.clear();
stats.markInvalidateAll();
}
@Override
public long size() {
return memCache.size();
}
@Override
public CacheStats stats() {
return memCache.stats();
}
@Override
public ConcurrentMap<K, V> asMap() {
return memCache.asMap();
}
@Override
public void cleanUp() {
memCache.cleanUp();
memCacheMetadata.clear();
}
@Override
@SuppressWarnings("unchecked")
public void receive(ByteBuffer buff) {
K key = (K) keyType.read(buff);
V value;
if (buff.get() == 0) {
value = null;
memCache.invalidate(key);
memCacheMetadata.remove(key);
} else {
value = (V) valueType.read(buff);
memCacheMetadata.put(key);
memCache.put(key, value);
}
stats.markRecvBroadcast();
if (!async) {
write(key, value);
}
}
/**
* Invoked on the eviction from the {@link #memCache}
*/
@Override
public void evicted(K key, V value, RemovalCause cause) {
if (async && EVICTION_CAUSES.contains(cause) && value != null) {
CacheMetadata.MetadataEntry metadata = memCacheMetadata.remove(key);
boolean qualifiesToPersist = true;
if (metadata != null && metadata.isReadFromPersistentCache()) {
qualifiesToPersist = false;
stats.markPutRejectedAlreadyPersisted();
} else if (metadata != null && metadata.getAccessCount() < 1) {
qualifiesToPersist = false;
stats.markPutRejectedEntryNotUsed();
} else if (!type.shouldCache(nodeStore, key)){
qualifiesToPersist = false;
stats.markPutRejectedAsCachedInSecondary();
}
if (qualifiesToPersist) {
boolean addedToQueue = writeQueue.addPut(key, value);
if (addedToQueue) {
long memory = 0L;
memory += (key == null ? 0L : keyType.getMemory(key));
memory += (value == null ? 0L : valueType.getMemory(value));
stats.markBytesWritten(memory);
stats.markPut();
} else {
stats.markPutRejectedQueueFull();
}
}
}
}
public PersistentCacheStats getPersistentCacheStats() {
return stats;
}
Map<K, V> getGenerationalMap() {
return Collections.unmodifiableMap(map);
}
}