blob: 84f28d1317e0612128898963855feb384c17df3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.blob;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.cache.AbstractCache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.Weigher;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.apache.jackrabbit.oak.cache.CacheLIRS.EvictionCallback;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.commons.io.FilenameUtils.normalizeNoEndSeparator;
import static org.apache.jackrabbit.oak.commons.FileIOUtils.copyInputStreamToFile;
/**
*/
public class FileCache extends AbstractCache<String, File> implements Closeable {
/**
* Logger instance.
*/
private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
private static final int SEGMENT_COUNT = Integer.getInteger("oak.blob.fileCache.segmentCount", 1);
protected static final String DOWNLOAD_DIR = "download";
/**
* Parent of the cache root directory
*/
private File parent;
/**
* The cacheRoot directory of the cache.
*/
private File cacheRoot;
private CacheLIRS<String, File> cache;
private FileCacheStats cacheStats;
private ExecutorService executor;
private CacheLoader<String, File> cacheLoader;
/**
* Convert the size calculation to KB to support max file size of 2 TB
*/
private static final Weigher<String, File> weigher = new Weigher<String, File>() {
@Override public int weigh(String key, File value) {
// convert to number of 4 KB blocks
return Math.round(value.length() / (4 * 1024));
}};
//Rough estimate of the in-memory key, value pair
private static final Weigher<String, File> memWeigher = new Weigher<String, File>() {
@Override public int weigh(String key, File value) {
return (StringUtils.estimateMemoryUsage(key) +
StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48);
}};
private FileCache(long maxSize /* bytes */, File root,
final CacheLoader<String, InputStream> loader, @Nullable final ExecutorService executor) {
this.parent = root;
this.cacheRoot = new File(root, DOWNLOAD_DIR);
// convert to number of 4 KB blocks
long size = Math.round(maxSize / (1024L * 4));
cacheLoader = new CacheLoader<String, File>() {
@Override public File load(String key) throws Exception {
// Fetch from local cache directory and if not found load from backend
File cachedFile = DataStoreCacheUtils.getFile(key, cacheRoot);
if (cachedFile.exists()) {
return cachedFile;
} else {
InputStream is = null;
boolean threw = true;
try {
is = loader.load(key);
copyInputStreamToFile(is, cachedFile);
threw = false;
} catch (Exception e) {
LOG.warn("Error reading object for id [{}] from backend", key, e);
throw e;
} finally {
Closeables.close(is, threw);
}
return cachedFile;
}
}
};
cache = new CacheLIRS.Builder<String, File>()
.maximumWeight(size)
.recordStats()
.weigher(weigher)
.segmentCount(SEGMENT_COUNT)
.evictionCallback(new EvictionCallback<String, File>() {
@Override
public void evicted(@NotNull String key, @Nullable File cachedFile,
@NotNull RemovalCause cause) {
try {
if (cachedFile != null && cachedFile.exists()
&& cause != RemovalCause.REPLACED) {
DataStoreCacheUtils.recursiveDelete(cachedFile, cacheRoot);
LOG.info("File [{}] evicted with reason [{}]", cachedFile, cause
.toString());
}
} catch (IOException e) {
LOG.info("Cached file deletion failed after eviction", e);
}
}})
.build();
this.cacheStats =
new FileCacheStats(cache, weigher, memWeigher, maxSize);
// TODO: Check persisting the in-memory map and initializing Vs building from fs
// Build in-memory cache asynchronously from the file system entries
if (executor == null) {
this.executor = Executors.newSingleThreadExecutor();
} else {
this.executor = executor;
}
this.executor.submit(new CacheBuildJob());
}
private FileCache() {
}
public static FileCache build(long maxSize /* bytes */, File root,
final CacheLoader<String, InputStream> loader, @Nullable final ExecutorService executor) {
if (maxSize > 0) {
return new FileCache(maxSize, root, loader, executor);
}
return new FileCache() {
@Override public void put(String key, File file) {
}
@Override public boolean containsKey(String key) {
return false;
}
@Nullable @Override public File getIfPresent(String key) {
return null;
}
@Override public File get(String key) throws IOException {
return null;
}
@Override public void invalidate(Object key) {
}
@Override public DataStoreCacheStatsMBean getStats() {
return new FileCacheStats(this, weigher, memWeigher, 0);
}
@Override public void close() {
}
};
}
/**
* Puts the given key and file into the cache.
* The file is moved to the cache. So, the original file
* won't be available after this operation. It can be retrieved
* using {@link #getIfPresent(String)}.
*
* @param key of the file
* @param file to put into cache
*/
@Override
public void put(String key, File file) {
put(key, file, true);
}
private void put(String key, File file, boolean copy) {
try {
File cached = DataStoreCacheUtils.getFile(key, cacheRoot);
if (!cached.exists()) {
if (copy) {
FileUtils.copyFile(file, cached);
} else {
FileUtils.moveFile(file, cached);
}
}
cache.put(key, cached);
} catch (IOException e) {
LOG.error("Exception adding id [{}] with file [{}] to cache, root cause: {}", key, file, e.getMessage());
LOG.debug("Root cause", e);
}
}
public boolean containsKey(String key) {
return cache.containsKey(key);
}
/**
* Retrieves the file handle from the cache if present and null otherwise.
*
* @param key of the file to retrieve
* @return File handle if available
*/
@Nullable
public File getIfPresent(String key) {
try {
return cache.getIfPresent(key);
} catch (Exception e) {
LOG.error("Error in retrieving [{}] from cache", key, e);
}
return null;
}
@Nullable
@Override
public File getIfPresent(Object key) {
return getIfPresent((String) key);
}
public File get(String key) throws IOException {
try {
// get from cache and download if not available
return cache.get(key, () -> cacheLoader.load(key));
} catch (ExecutionException e) {
LOG.error("Error loading [{}] from cache", key);
throw new IOException(e);
}
}
@Override
public void invalidate(Object key) {
cache.invalidate(key);
}
public DataStoreCacheStatsMBean getStats() {
return cacheStats;
}
@Override
public void close() {
LOG.info("Cache stats on close [{}]", cacheStats.cacheInfoAsString());
new ExecutorCloser(executor).close();
}
/**
* Called to initialize the in-memory cache from the fs folder
*/
private class CacheBuildJob implements Callable {
@Override
public Integer call() {
Stopwatch watch = Stopwatch.createStarted();
int count = build();
LOG.info("Cache built with [{}] files from file system in [{}] seconds",
count, watch.elapsed(TimeUnit.SECONDS));
return count;
}
}
/**
* Retrieves all the files present in the fs cache folder and builds the in-memory cache.
*/
private int build() {
int count = 0;
// Move older generation cache downloaded files to the new folder
DataStoreCacheUpgradeUtils.moveDownloadCache(parent);
// Iterate over all files in the cache folder
Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(cacheRoot)
.filter(new Predicate<File>() {
@Override public boolean apply(File input) {
return input.isFile() && !normalizeNoEndSeparator(input.getParent())
.equals(cacheRoot.getAbsolutePath());
}
}).iterator();
while (iter.hasNext()) {
File toBeSyncedFile = iter.next();
try {
put(toBeSyncedFile.getName(), toBeSyncedFile, false);
count++;
LOG.trace("Added file [{}} to in-memory cache", toBeSyncedFile);
} catch (Exception e) {
LOG.error("Error in putting cached file in map[{}]", toBeSyncedFile);
}
}
LOG.trace("[{}] files put in im-memory cache", count);
return count;
}
}
class FileCacheStats extends CacheStats implements DataStoreCacheStatsMBean {
private static final long KB = 4 * 1024;
private final Weigher<Object, Object> memWeigher;
private final Weigher<Object, Object> weigher;
private final Cache<Object, Object> cache;
/**
* Construct the cache stats object.
* @param cache the cache
* @param weigher the weigher used to estimate the current weight
* @param maxWeight the maximum weight
*/
public FileCacheStats(Cache<?, ?> cache, Weigher<?, ?> weigher, Weigher<?, ?> memWeigher,
long maxWeight) {
super(cache, "DataStore-DownloadCache", weigher, maxWeight);
this.memWeigher = (Weigher<Object, Object>) memWeigher;
this.weigher = (Weigher<Object, Object>) weigher;
this.cache = (Cache<Object, Object>) cache;
}
@Override
public long estimateCurrentMemoryWeight() {
if (memWeigher == null) {
return -1;
}
long size = 0;
for (Map.Entry<?, ?> e : cache.asMap().entrySet()) {
Object k = e.getKey();
Object v = e.getValue();
size += memWeigher.weigh(k, v);
}
return size;
}
@Override
public long estimateCurrentWeight() {
if (weigher == null) {
return -1;
}
long size = 0;
for (Map.Entry<?, ?> e : cache.asMap().entrySet()) {
Object k = e.getKey();
Object v = e.getValue();
size += weigher.weigh(k, v) * KB;
}
return size;
}
}