blob: a6227debb12ef950bc3ce17072435598e36e6c96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document.persistentCache;
import java.io.File;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jackrabbit.oak.cache.CacheValue;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.UDPBroadcaster;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.h2.mvstore.FileStore;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVMap.Builder;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.MVStoreTool;
import org.h2.mvstore.WriteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
/**
* A persistent cache for the document store.
*/
public class PersistentCache implements Broadcaster.Listener {
static final Logger LOG = LoggerFactory.getLogger(PersistentCache.class);
private static final String FILE_PREFIX = "cache-";
private static final String FILE_SUFFIX = ".data";
private static final AtomicInteger COUNTER = new AtomicInteger();
private boolean cacheNodes = true;
private boolean cacheChildren = true;
private boolean cacheDiff = true;
private boolean cacheLocalDiff = true;
private boolean cachePrevDocs = true;
private boolean compactOnClose;
private boolean compress = true;
private boolean asyncCache = true;
private boolean asyncDiffCache = false;
private HashMap<CacheType, GenerationCache> caches =
new HashMap<CacheType, GenerationCache>();
private final String directory;
private MapFactory writeStore;
private MapFactory readStore;
private int maxSizeMB = 1024;
private int memCache = -1;
private int readGeneration = -1;
private int writeGeneration;
private int autoCompact = 0;
private boolean appendOnly;
private boolean manualCommit;
private Broadcaster broadcaster;
private ThreadLocal<WriteBuffer> writeBuffer = new ThreadLocal<WriteBuffer>();
private final byte[] broadcastId;
private DynamicBroadcastConfig broadcastConfig;
private CacheActionDispatcher writeDispatcher;
private Thread writeDispatcherThread;
{
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
UUID uuid = UUID.randomUUID();
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
broadcastId = bb.array();
}
private int exceptionCount;
public PersistentCache(String url) {
LOG.info("start, url={}", url);
String[] parts = url.split(",");
String dir = parts[0];
String broadcast = "disabled";
for (String p : parts) {
if (p.equals("+docs")) {
logUnsupportedWarning("docs");
} else if (p.equals("-prevDocs")) {
cachePrevDocs = false;
} else if (p.equals("+docChildren")) {
logUnsupportedWarning("docChildren");
} else if (p.equals("-nodes")) {
cacheNodes = false;
} else if (p.equals("-children")) {
cacheChildren = false;
} else if (p.equals("-diff")) {
cacheDiff = false;
} else if (p.equals("-localDiff")) {
cacheLocalDiff = false;
} else if (p.equals("+all")) {
logUnsupportedWarning("all");
} else if (p.equals("-compact")) {
compactOnClose = false;
} else if (p.equals("+compact")) {
compactOnClose = true;
} else if (p.equals("-compress")) {
compress = false;
} else if (p.endsWith("time")) {
dir += "-" + System.currentTimeMillis() + "-" + COUNTER.getAndIncrement();
} else if (p.startsWith("size=")) {
maxSizeMB = Integer.parseInt(p.split("=")[1]);
} else if (p.startsWith("memCache=")) {
memCache = Integer.parseInt(p.split("=")[1]);
} else if (p.startsWith("binary=")) {
logUnsupportedWarning("binary");
} else if (p.startsWith("autoCompact=")) {
autoCompact = Integer.parseInt(p.split("=")[1]);
} else if (p.equals("appendOnly")) {
appendOnly = true;
} else if (p.equals("manualCommit")) {
manualCommit = true;
} else if (p.startsWith("broadcast=")) {
broadcast = p.split("=")[1];
} else if (p.equals("-async")) {
asyncCache = false;
} else if (p.equals("+asyncDiff")) {
asyncDiffCache = true;
}
}
this.directory = dir;
if (dir.length() == 0) {
readGeneration = -1;
writeGeneration = 0;
writeStore = createMapFactory(writeGeneration, false);
return;
}
File dr = new File(dir);
if (!dr.exists()) {
dr.mkdirs();
}
if (dr.exists() && !dr.isDirectory()) {
throw new IllegalArgumentException("A file exists at cache directory " + dir);
}
File[] list = dr.listFiles();
TreeSet<Integer> generations = new TreeSet<Integer>();
if (list != null) {
for (File f : list) {
String fn = f.getName();
if (fn.startsWith(FILE_PREFIX) && fn.endsWith(FILE_SUFFIX)) {
String g = fn.substring(FILE_PREFIX.length(), fn.indexOf(FILE_SUFFIX));
try {
int gen = Integer.parseInt(g);
if (gen >= 0) {
File f2 = new File(getFileName(gen));
if (fn.equals(f2.getName())) {
// ignore things like "cache-000.data"
generations.add(gen);
}
}
} catch (Exception e) {
// ignore this file
}
}
}
}
while (generations.size() > 2) {
Integer oldest = generations.first();
File oldFile = new File(getFileName(oldest));
if (!oldFile.canWrite()) {
LOG.info("Ignoring old, read-only generation " + oldFile.getAbsolutePath());
} else {
LOG.info("Removing old generation " + oldFile.getAbsolutePath());
oldFile.delete();
}
generations.remove(oldest);
}
try {
readGeneration = generations.size() > 1 ? generations.first() : -1;
writeGeneration = generations.size() > 0 ? generations.last() : 0;
if (readGeneration >= 0) {
readStore = createMapFactory(readGeneration, true);
}
writeStore = createMapFactory(writeGeneration, false);
initBroadcast(broadcast);
writeDispatcher = new CacheActionDispatcher();
writeDispatcherThread = new Thread(writeDispatcher, "Oak CacheWriteQueue");
writeDispatcherThread.setDaemon(true);
writeDispatcherThread.start();
} catch (RuntimeException ex) {
// OAK-8052: cleanup stores in case of failure
LOG.error("Exception during PersistentCache instantiation for {}.", url);
close();
}
}
private void logUnsupportedWarning(String configKey) {
LOG.warn("Support for '{}' has been removed from persistent cache. " +
"Please update the configuration.", configKey);
}
private void initBroadcast(String broadcast) {
if (broadcast == null) {
return;
}
if (broadcast.equals("disabled")) {
return;
} else if (broadcast.equals("inMemory")) {
broadcaster = InMemoryBroadcaster.INSTANCE;
} else if (broadcast.startsWith("udp:")) {
String config = broadcast.substring("udp:".length(), broadcast.length());
broadcaster = new UDPBroadcaster(config);
} else if (broadcast.startsWith("tcp:")) {
String config = broadcast.substring("tcp:".length(), broadcast.length());
broadcaster = new TCPBroadcaster(config);
} else {
throw new IllegalArgumentException("Unknown broadcaster type " + broadcast);
}
broadcaster.addListener(this);
}
private String getFileName(int generation) {
if (directory.length() == 0) {
return null;
}
return directory + "/" + FILE_PREFIX + generation + FILE_SUFFIX;
}
private MapFactory createMapFactory(final int generation, final boolean readOnly) {
MapFactory f = new MapFactory() {
final String fileName = getFileName(generation);
MVStore store;
@Override
void openStore() {
if (store != null) {
return;
}
MVStore.Builder builder = new MVStore.Builder();
try {
if (compress) {
builder.compress();
}
if (manualCommit) {
builder.autoCommitDisabled();
}
if (fileName != null) {
builder.fileName(fileName);
}
if (memCache >= 0) {
builder.cacheSize(memCache);
}
if (readOnly) {
builder.readOnly();
}
if (maxSizeMB < 10) {
builder.cacheSize(maxSizeMB);
}
if (autoCompact >= 0) {
builder.autoCompactFillRate(autoCompact);
}
builder.backgroundExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
exceptionCount++;
LOG.debug("Error in the background thread of the persistent cache", e);
LOG.warn("Error in the background thread of the persistent cache: " + e);
}
});
store = builder.open();
if (appendOnly) {
store.setReuseSpace(false);
}
} catch (Exception e) {
exceptionCount++;
LOG.warn("Could not open the store " + fileName, e);
}
}
@Override
synchronized void closeStore() {
if (store == null) {
return;
}
boolean compact = compactOnClose;
try {
if (store.getFileStore().isReadOnly()) {
compact = false;
}
// clear the interrupted flag, if set
Thread.interrupted();
store.close();
} catch (Exception e) {
exceptionCount++;
LOG.debug("Could not close the store", e);
LOG.warn("Could not close the store: " + e);
store.closeImmediately();
}
if (compact) {
try {
MVStoreTool.compact(fileName, true);
} catch (Exception e) {
exceptionCount++;
LOG.debug("Could not compact the store", e);
LOG.warn("Could not compact the store: " + e);
}
}
store = null;
}
@Override
<K, V> Map<K, V> openMap(String name, Builder<K, V> builder) {
try {
if (builder == null) {
return store.openMap(name);
}
return store.openMap(name, builder);
} catch (Exception e) {
exceptionCount++;
LOG.warn("Could not open the map", e);
return null;
}
}
@Override
long getFileSize() {
try {
if (store == null) {
return 0;
}
FileStore fs = store.getFileStore();
if (fs == null) {
return 0;
}
return fs.size();
} catch (Exception e) {
exceptionCount++;
LOG.warn("Could not retrieve the map size", e);
return 0;
}
}
};
f.openStore();
return f;
}
public void close() {
writeDispatcher.stop();
try {
writeDispatcherThread.join();
} catch (InterruptedException e) {
LOG.error("Can't join the {}", writeDispatcherThread.getName(), e);
}
if (writeStore != null) {
writeStore.closeStore();
}
if (readStore != null) {
readStore.closeStore();
}
if (broadcaster != null) {
broadcaster.removeListener(this);
broadcaster.close();
broadcaster = null;
}
writeBuffer.remove();
}
public synchronized <K extends CacheValue, V extends CacheValue> Cache<K, V> wrap(
DocumentNodeStore docNodeStore,
DocumentStore docStore,
Cache<K, V> base, CacheType type) {
return wrap(docNodeStore, docStore, base, type, StatisticsProvider.NOOP);
}
public synchronized <K extends CacheValue, V extends CacheValue> Cache<K, V> wrap(
DocumentNodeStore docNodeStore,
DocumentStore docStore,
Cache<K, V> base, CacheType type,
StatisticsProvider statisticsProvider) {
boolean wrap;
boolean async = asyncCache;
switch (type) {
case NODE:
wrap = cacheNodes;
break;
case CHILDREN:
wrap = cacheChildren;
break;
case DIFF:
wrap = cacheDiff;
async = asyncDiffCache;
break;
case LOCAL_DIFF:
wrap = cacheLocalDiff;
async = asyncDiffCache;
break;
case PREV_DOCUMENT:
wrap = cachePrevDocs;
break;
default:
wrap = false;
break;
}
if (wrap) {
NodeCache<K, V> c = new NodeCache<K, V>(this,
base, docNodeStore, docStore,
type, writeDispatcher, statisticsProvider, async);
initGenerationCache(c);
return c;
}
return base;
}
private void initGenerationCache(GenerationCache c) {
caches.put(c.getType(), c);
if (readGeneration >= 0) {
c.addGeneration(readGeneration, true);
}
c.addGeneration(writeGeneration, false);
}
public synchronized <K, V> CacheMap<K, V> openMap(int generation, String name,
MVMap.Builder<K, V> builder) {
MapFactory s;
if (generation == readGeneration) {
s = readStore;
} else if (generation == writeGeneration) {
s = writeStore;
} else {
exceptionCount++;
throw new IllegalArgumentException("Unknown generation: " + generation);
}
return new CacheMap<K, V>(s, name, builder);
}
public void switchGenerationIfNeeded() {
if (!needSwitch()) {
return;
}
synchronized (this) {
// maybe another thread already switched,
// so we need to check again
if (!needSwitch()) {
return;
}
int oldReadGeneration = readGeneration;
MapFactory oldRead = readStore;
readStore = writeStore;
readGeneration = writeGeneration;
MapFactory w = createMapFactory(writeGeneration + 1, false);
writeStore = w;
writeGeneration++;
for (GenerationCache c : caches.values()) {
c.addGeneration(writeGeneration, false);
if (oldReadGeneration >= 0) {
c.removeGeneration(oldReadGeneration);
}
}
if (oldRead != null) {
oldRead.closeStore();
new File(getFileName(oldReadGeneration)).delete();
}
}
}
boolean needSwitch() {
long size = writeStore.getFileSize();
if (size / 1024 / 1024 <= maxSizeMB) {
return false;
}
return true;
}
public int getMaxSize() {
return maxSizeMB;
}
public int getOpenCount() {
return writeStore.getOpenCount();
}
public int getExceptionCount() {
return exceptionCount;
}
void broadcast(CacheType type, Function<WriteBuffer, Void> writer) {
Broadcaster b = broadcaster;
if (b == null) {
return;
}
WriteBuffer buff = writeBuffer.get();
if (buff == null) {
buff = new WriteBuffer();
writeBuffer.set(buff);
}
buff.clear();
// space for the length
buff.putInt(0);
buff.put(broadcastId);
buff.put((byte) type.ordinal());
writer.apply(buff);
ByteBuffer byteBuff = buff.getBuffer();
int length = byteBuff.position();
((Buffer)byteBuff).limit(length);
// write length
byteBuff.putInt(0, length);
((Buffer)byteBuff).position(0);
b.send(byteBuff);
}
@Override
public void receive(ByteBuffer buff) {
int end = buff.position() + buff.getInt();
byte[] id = new byte[broadcastId.length];
buff.get(id);
if (!Arrays.equals(id, broadcastId)) {
// process only messages from other senders
receiveMessage(buff);
}
((Buffer)buff).position(end);
}
public static PersistentCacheStats getPersistentCacheStats(Cache<?, ?> cache) {
if (cache instanceof NodeCache) {
return ((NodeCache<?, ?>) cache).getPersistentCacheStats();
}
else {
return null;
}
}
private void receiveMessage(ByteBuffer buff) {
CacheType type = CacheType.VALUES[buff.get()];
GenerationCache cache = caches.get(type);
if (cache == null) {
return;
}
cache.receive(buff);
}
public DynamicBroadcastConfig getBroadcastConfig() {
return broadcastConfig;
}
public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
this.broadcastConfig = broadcastConfig;
if (broadcaster != null) {
broadcaster.setBroadcastConfig(broadcastConfig);
}
}
interface GenerationCache {
void addGeneration(int writeGeneration, boolean b);
CacheType getType();
void receive(ByteBuffer buff);
void removeGeneration(int oldReadGeneration);
}
}