blob: c6dfebcf88e2e36af1bec1d041cd0edfccc0745b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.blob;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.cache.Weigher;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
import org.apache.jackrabbit.oak.stats.CounterStats;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatsOptions;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Objects.toStringHelper;
import static java.lang.String.format;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils
.movePendingUploadsToStaging;
/**
* Cache for staging async uploads. This serves as a temporary cache for serving local
* requests till the time the upload has not been synced with the backend.
* <p>
* The appropriate backend for this cache are wrapped in {@link StagingUploader}
* implementations.
* <p>
*/
public class UploadStagingCache implements Closeable {
/**
* Logger instance.
*/
private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCache.class);
protected static final String UPLOAD_STAGING_DIR = "upload";
//Rough estimate of the in-memory key, value pair
private final Weigher<String, File> memWeigher = new Weigher<String, File>() {
@Override public int weigh(String key, File value) {
return (StringUtils.estimateMemoryUsage(key) +
StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48);
}};
/**
* Max size of the upload staging cache in bytes
*/
private long size;
/**
* Current cache size in bytes
*/
private AtomicLong currentSize;
/**
* Executor for async uploads
*/
private ListeningExecutorService executor;
/**
* Scheduled executor for build and remove
*/
private ScheduledExecutorService scheduledExecutor;
/**
* In memory map for staged files
*/
private ConcurrentMap<String, File> map;
/**
* In memory map for files to be deleted after uploads
*/
private ConcurrentMap<String, File> attic;
/**
* Local directory where uploads are staged
*/
private File uploadCacheSpace;
/**
* Wrapper to where the blobs are uploaded/written
*/
private StagingUploader uploader;
/**
* Cache stats
*/
private StagingCacheStats cacheStats;
/**
* Handle for download cache if any
*/
@Nullable
private FileCache downloadCache;
/**
* Scheduled executor for stats in case required
*/
private ScheduledExecutorService statsExecutor;
/**
* Queue containing items to retry.
*/
private LinkedBlockingQueue<String> retryQueue;
private UploadStagingCache(File dir, File home, int uploadThreads, long size /* bytes */,
StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider,
@Nullable ListeningExecutorService executor,
@Nullable ScheduledExecutorService scheduledExecutor,
int purgeInterval /* secs */, int retryInterval /* secs */) {
this.currentSize = new AtomicLong();
this.size = size;
this.executor = executor;
if (executor == null) {
this.executor = MoreExecutors.listeningDecorator(Executors
.newFixedThreadPool(uploadThreads, new NamedThreadFactory("oak-ds-async-upload-thread")));
}
this.scheduledExecutor = scheduledExecutor;
if (scheduledExecutor == null) {
this.scheduledExecutor = Executors
.newScheduledThreadPool(2, new NamedThreadFactory("oak-ds-cache-scheduled-thread"));
}
this.map = Maps.newConcurrentMap();
this.attic = Maps.newConcurrentMap();
this.retryQueue = new LinkedBlockingQueue<String>();
this.uploadCacheSpace = new File(dir, "upload");
this.uploader = uploader;
if (statisticsProvider == null) {
statsExecutor = Executors.newSingleThreadScheduledExecutor();
statisticsProvider = new DefaultStatisticsProvider(statsExecutor);
}
this.cacheStats = new StagingCacheStats(this, statisticsProvider, size);
this.downloadCache = cache;
build(home, dir);
this.scheduledExecutor
.scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval, TimeUnit.SECONDS);
this.scheduledExecutor
.scheduleAtFixedRate(new RetryJob(), retryInterval, retryInterval, TimeUnit.SECONDS);
}
private UploadStagingCache() {
}
public static UploadStagingCache build(File dir, File home, int uploadThreads, long size
/* bytes */, StagingUploader uploader, @Nullable FileCache cache,
StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor,
@Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval /* secs */,
int retryInterval /* secs */) {
if (size > 0) {
return new UploadStagingCache(dir, home, uploadThreads, size, uploader, cache,
statisticsProvider, executor, scheduledExecutor, purgeInterval, retryInterval);
}
return new UploadStagingCache() {
@Override public Optional<SettableFuture<Integer>> put(String id, File input) {
return Optional.absent();
}
@Override protected void invalidate(String key) {
}
@Override protected Iterator<String> getAllIdentifiers() {
return Iterators.emptyIterator();
}
@Nullable @Override public File getIfPresent(String key) {
return null;
}
@Override public DataStoreCacheStatsMBean getStats() {
return new StagingCacheStats(this, StatisticsProvider.NOOP, 0);
}
@Override public void close() {
}
};
}
/**
* Retrieves all the files staged in the staging area and schedules them for uploads.
* @param home the home of the repo
* @param rootPath the parent of the cache
*/
private void build(File home, File rootPath) {
LOG.info("Scheduling pending uploads");
// Move any older cache pending uploads
movePendingUploadsToStaging(home, rootPath, true);
Iterator<File> iter = Files.fileTreeTraverser().postOrderTraversal(uploadCacheSpace)
.filter(new Predicate<File>() {
@Override public boolean apply(File input) {
return input.isFile();
}
}).iterator();
int count = 0;
while (iter.hasNext()) {
File toBeSyncedFile = iter.next();
Optional<SettableFuture<Integer>> scheduled =
putOptionalDisregardingSize(toBeSyncedFile.getName(), toBeSyncedFile, true);
if (scheduled.isPresent()) {
count++;
} else {
LOG.info("File [{}] not setup for upload", toBeSyncedFile.getName());
}
}
LOG.info("Scheduled [{}] pending uploads", count);
}
/**
* Puts the file into the staging cache if possible.
* Returns an optional SettableFuture if staged for upload otherwise empty.
*
* @param id the id of the file to be staged
* @param input the file to be staged
* @return An Optional SettableFuture containing
* 1 if upload was successful,
* 0 if an existing id is already pending for upload
*/
public Optional<SettableFuture<Integer>> put(String id, File input) {
return putOptionalDisregardingSize(id, input, false);
}
/**
* Puts the file into the staging cache if ignoreSize else if possible
* Returns an optional SettableFuture if staged for upload otherwise empty.
*
* @param id
* @param input
* @param ignoreSize
* @return
*/
private Optional<SettableFuture<Integer>> putOptionalDisregardingSize(String id, File input,
boolean ignoreSize) {
cacheStats.markRequest();
long length = input.length();
File uploadFile = DataStoreCacheUtils.getFile(id, uploadCacheSpace);
// if ignoreSize update internal size else size permits
// and not upload complete or already scheduled for upload
if (((ignoreSize && currentSize.addAndGet(length) >= 0)
|| currentSize.addAndGet(length) <= size)
&& !attic.containsKey(id)
&& map.putIfAbsent(id, uploadFile) == null ) {
try {
if (!uploadFile.exists()) {
FileUtils.moveFile(input, uploadFile);
LOG.trace("File [{}] moved to staging cache [{}]", input, uploadFile);
}
// update stats
cacheStats.markHit();
cacheStats.incrementCount();
cacheStats.incrementSize(length);
cacheStats.incrementMemSize(memWeigher.weigh(id, uploadFile));
return Optional.of(stage(id, uploadFile));
} catch (Exception e) {
LOG.info("Error moving file to staging", e);
//reset the current state and return empty flag as not added to cache
currentSize.addAndGet(-length);
map.remove(id, uploadFile);
}
} else {
currentSize.addAndGet(-length);
// if file is still pending upload, count it as present
if (map.containsKey(id) || attic.containsKey(id)) {
SettableFuture<Integer> result = SettableFuture.create();
result.set(0);
return Optional.of(result);
}
}
return Optional.absent();
}
/**
* Stages the file for async upload.
* * Puts the file into the stage caching file system directory
* * Schedules a job for upload to write using the given {@link StagingUploader}
* * Updates the internal map and size variable
* * Adds a callback listener to remove the file once finished
* @param id of the file to be staged
* @param upload the file to be staged
* @return a SettableFuture instance
*/
private SettableFuture<Integer> stage(final String id, final File upload) {
final SettableFuture<Integer> result = SettableFuture.create();
try {
// create an async job
ListenableFuture<Integer> future = executor.submit(new Callable<Integer>() {
@Override public Integer call() throws Exception {
try {
final TimerStats.Context uploadContext = cacheStats.startUpLoaderTimer();
uploader.write(id, upload);
LOG.debug("File added to backend [{}]", upload);
uploadContext.stop();
return 1;
} catch (Exception e) {
LOG.error("Error adding file to backend", e);
throw e;
}
}
});
// Add a callback to the returned Future object for handling success and error
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override public void onSuccess(@Nullable Integer r) {
LOG.info("Successfully added [{}], [{}]", id, upload);
try {
// move to attic to be deleted and remove from in-memory map
attic.put(id, upload);
// Add the uploaded file to the download cache if available
if (downloadCache != null) {
// Touch the file to update timestamp and record length
Files.touch(upload);
downloadCache.put(id, upload);
LOG.debug("[{}] added to cache", id);
}
map.remove(id);
} catch (IOException e) {
LOG.warn("Error in cleaning up [{}] from staging", upload);
}
result.set(r);
}
@Override public void onFailure(Throwable t) {
LOG.error("Error adding [{}] with file [{}] to backend", id, upload, t);
result.setException(t);
retryQueue.add(id);
}
});
LOG.debug("File [{}] scheduled for upload [{}]", upload, result);
} catch (Exception e) {
LOG.error("Error staging file for upload [{}]", upload, e);
}
return result;
}
/**
* Invalidate called externally.
* @param key to invalidate
*/
protected void invalidate(String key) {
// Check if not already scheduled for deletion
if (!attic.containsKey(key) && map.containsKey(key)) {
try {
LOG.debug("Invalidating [{}]", key);
File toBeDeleted = map.get(key);
deleteInternal(key, toBeDeleted);
map.remove(key, toBeDeleted);
} catch (IOException e) {
LOG.warn("Could not delete file from staging", e);
}
}
}
/**
* Returns all identifiers presently staged.
*
* @return iterator of all identifiers presently staged.
*/
protected Iterator<String> getAllIdentifiers() {
return map.keySet().iterator();
}
/**
* Removes all cached from attic
*/
private void remove() {
LOG.info("Starting purge of uploaded files");
Iterator<String> iterator = attic.keySet().iterator();
int count = 0;
while (iterator.hasNext()) {
String key = iterator.next();
try {
// Check if not already scheduled for upload
if (!map.containsKey(key)) {
LOG.trace("upload map contains id [{}]", key);
File toBeDeleted = attic.get(key);
deleteInternal(key, toBeDeleted);
iterator.remove();
LOG.debug("Cache [{}] file deleted for id [{}]", toBeDeleted, key);
count++;
}
} catch (IOException e) {
LOG.error("Error in removing entry for id [{}]", key);
}
}
LOG.info("Finished removal of [{}] files", count);
}
/**
* Adjust stats and delete file.
*
* @param key to delete
* @param toBeDeleted file to delete
* @throws IOException
*/
private void deleteInternal(String key, File toBeDeleted) throws IOException {
LOG.debug("Trying to delete file [{}]", toBeDeleted);
long length = toBeDeleted.length();
DataStoreCacheUtils.recursiveDelete(toBeDeleted, uploadCacheSpace);
LOG.debug("deleted file [{}]", toBeDeleted);
currentSize.addAndGet(-length);
// Update stats for removal
cacheStats.decrementSize(length);
cacheStats.decrementMemSize(memWeigher.weigh(key, toBeDeleted));
cacheStats.decrementCount();
}
/**
* Returns the File if present or null otherwise.
* Any usage of the returned file should assert for its existence as the file
* could be purged from the file system once uploaded using the internal scheduled remove
* mechanism.
*
* @param key of the file to check
* @return a File object if found
*/
@Nullable
public File getIfPresent(String key) {
cacheStats.markLoad();
if (map.containsKey(key)) {
cacheStats.markLoadSuccess();
return map.get(key);
}
return null;
}
/**
* Cache related stats
*
* @return an instance of the {@link DataStoreCacheStatsMBean}.
*/
public DataStoreCacheStatsMBean getStats() {
return cacheStats;
}
@Override
public void close() {
LOG.info("Uploads in progress on close [{}]", map.size());
LOG.info("Uploads completed but not cleared from cache [{}]", attic.size());
LOG.info("Staging cache stats on close [{}]", cacheStats.cacheInfoAsString());
new ExecutorCloser(executor).close();
new ExecutorCloser(scheduledExecutor).close();
new ExecutorCloser(statsExecutor).close();
}
protected void setDownloadCache(@Nullable FileCache downloadCache) {
this.downloadCache = downloadCache;
}
/**
* Class which calls remove on all
*/
class RemoveJob implements Runnable {
@Override
public void run() {
remove();
}
}
/**
* Job to retry failed uploads.
*/
class RetryJob implements Runnable {
@Override
public void run() {
LOG.debug("Retry job started");
int count = 0;
List<String> entries = Lists.newArrayList();
retryQueue.drainTo(entries);
for (String key : entries) {
File file = map.get(key);
LOG.info("Retrying upload of id [{}] with file [{}] ", key, file);
stage(key, file);
count++;
LOG.info("Scheduled retry for upload of id [{}] with file [{}]", key, file);
}
LOG.debug("Retry job finished with staging [{}] jobs", count);
}
}
}
/**
* Upload Staging Cache Statistics.
*/
class StagingCacheStats extends AnnotatedStandardMBean implements DataStoreCacheStatsMBean {
private static final String HITS = "HITS";
private static final String REQUESTS = "REQUESTS";
private static final String UPLOAD_TIMER = "UPLOAD_TIMER";
private static final String LOAD_SUCCESS = "CACHE_LOAD_SUCCESS";
private static final String LOAD = "CACHE_LOAD";
private static final String CURRENT_SIZE = "CURRENT_SIZE";
private static final String CURRENT_MEM_SIZE = "CURRENT_MEM_SIZE";
private static final String COUNT = "COUNT";
private final String cacheName;
/** Max size in bytes configured for the cache **/
private final long maxWeight;
/** Tracking the number of uploads that could be staged **/
private final MeterStats hitMeter;
/** Tracking the number of requests to upload & stage **/
private final MeterStats requestMeter;
/** Tracking the number of get requests serviced by the cache **/
private final MeterStats loadSuccessMeter;
/** Tracking the number of get requests received by the cache **/
private final MeterStats loadMeter;
/** Tracking the upload time **/
private final TimerStats uploadTimer;
/** Tracking the current size in MB **/
private final CounterStats currentSizeMeter;
/** Tracking the in-memory size of cache **/
private final CounterStats currentMemSizeMeter;
/** Tracking the cache element count **/
private final CounterStats countMeter;
/** Handle to the cache **/
private final UploadStagingCache cache;
StagingCacheStats(UploadStagingCache cache, StatisticsProvider provider, long maxWeight) {
super(DataStoreCacheStatsMBean.class);
this.cache = cache;
StatisticsProvider statisticsProvider = provider;
// Configure cache name
cacheName = "DataStore-StagingCache";
this.maxWeight = maxWeight;
// Fetch stats and time series
String statName;
statName = getStatName(HITS, cacheName);
hitMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
statName = getStatName(REQUESTS, cacheName);
requestMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
statName = getStatName(UPLOAD_TIMER, cacheName);
uploadTimer = statisticsProvider.getTimer(statName, StatsOptions.METRICS_ONLY);
statName = getStatName(LOAD_SUCCESS, cacheName);
loadSuccessMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
statName = getStatName(LOAD, cacheName);
loadMeter = statisticsProvider.getMeter(statName, StatsOptions.METRICS_ONLY);
statName = getStatName(CURRENT_SIZE, cacheName);
currentSizeMeter = statisticsProvider.getCounterStats(statName, StatsOptions.METRICS_ONLY);
statName = getStatName(CURRENT_MEM_SIZE, cacheName);
currentMemSizeMeter = statisticsProvider.getCounterStats(statName, StatsOptions.METRICS_ONLY);
statName = getStatName(COUNT, cacheName);
countMeter = statisticsProvider.getCounterStats(statName, StatsOptions.METRICS_ONLY);
}
//~--------------------------------------< stats update methods
void markHit() {
hitMeter.mark();
}
void markRequest() {
requestMeter.mark();
}
void markLoadSuccess() {
loadSuccessMeter.mark();
}
void markLoad() {
loadMeter.mark();
}
TimerStats.Context startUpLoaderTimer() {
return this.uploadTimer.time();
}
void incrementCount() {
countMeter.inc();
}
void incrementSize(long size) {
currentSizeMeter.inc(size);
}
void incrementMemSize(long size) {
currentMemSizeMeter.inc(size);
}
void decrementCount() {
countMeter.dec();
}
void decrementSize(long size) {
currentSizeMeter.dec(size);
}
void decrementMemSize(int size) {
currentMemSizeMeter.dec(size);
}
@Override
public String getName() {
return cacheName;
}
@Override
public long getRequestCount() {
return requestMeter.getCount();
}
@Override
public long getHitCount() {
return hitMeter.getCount();
}
@Override
public double getHitRate() {
long hitCount = hitMeter.getCount();
long requestCount = requestMeter.getCount();
return (requestCount == 0L ? 0L : (double)hitCount/requestCount);
}
@Override
public long getMissCount() {
return requestMeter.getCount() - hitMeter.getCount();
}
@Override
public double getMissRate() {
long missCount = getMissCount();
long requestCount = requestMeter.getCount();
return (requestCount == 0L ? 0L : (double) missCount/requestCount);
}
@Override
public long getLoadCount() {
return loadMeter.getCount();
}
@Override
public long getLoadSuccessCount() {
return loadSuccessMeter.getCount();
}
@Override
public long getLoadExceptionCount() {
return (getLoadCount() - getLoadSuccessCount());
}
@Override
public double getLoadExceptionRate() {
long loadExceptionCount = getLoadExceptionCount();
long loadCount = loadMeter.getCount();
return (loadCount == 0L ? 0L : (double) loadExceptionCount/loadCount);
}
@Override
public long getElementCount() {
return countMeter.getCount();
}
@Override
public long getMaxTotalWeight() {
return maxWeight;
}
@Override
public long estimateCurrentWeight() {
return currentSizeMeter.getCount();
}
@Override
public long estimateCurrentMemoryWeight() {
return currentMemSizeMeter.getCount();
}
@Override
public String cacheInfoAsString() {
return toStringHelper("StagingCacheStats")
.add("requestCount", getRequestCount())
.add("hitCount", getHitCount())
.add("hitRate", format("%1.2f", getHitRate()))
.add("missCount", getMissCount())
.add("missRate", format("%1.2f", getMissRate()))
.add("loadCount", getLoadCount())
.add("loadSuccessCount", getLoadSuccessCount())
.add("elementCount", getElementCount())
.add("currentMemSize", estimateCurrentMemoryWeight())
.add("totalWeight", humanReadableByteCount(estimateCurrentWeight()))
.add("maxWeight", humanReadableByteCount(getMaxTotalWeight()))
.toString();
}
//~--------------------------------------< CacheStatsMBean - stats that are not (yet) available
@Override
public long getTotalLoadTime() {
return 0;
}
@Override
public double getAverageLoadPenalty() {
return 0;
}
@Override
public long getEvictionCount() {
return 0;
}
@Override
public void resetStats() {
}
//~--------------------------------------< private helpers
private static String getStatName(String meter, String cacheName) {
return cacheName + "." + meter;
}
}
/**
* Wrapper for backend used for uploading
*/
interface StagingUploader {
void write(String id, File f) throws DataStoreException;
}