blob: d9692655f55a352731237a5f1fe61c097c507224 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.cache.AbstractCacheStats;
import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.segment.spi.RepositoryNotReachableException;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public abstract class AbstractPersistentCache implements PersistentCache, Closeable {
private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentCache.class);
public static final int THREADS = Integer.getInteger("oak.segment.cache.threads", 10);
protected ExecutorService executor;
protected AtomicLong cacheSize = new AtomicLong(0);
protected PersistentCache nextCache;
protected final Set<String> writesPending;
protected SegmentCacheStats segmentCacheStats;
public AbstractPersistentCache() {
executor = Executors.newFixedThreadPool(THREADS);
writesPending = ConcurrentHashMap.newKeySet();
}
public PersistentCache linkWith(AbstractPersistentCache nextCache) {
this.nextCache = nextCache;
return nextCache;
}
@Override
public Buffer readSegment(long msb, long lsb, @NotNull Callable<Buffer> loader) {
Buffer segment = readSegmentInternal(msb, lsb);
if (segment != null) {
segmentCacheStats.hitCount.incrementAndGet();
return segment;
}
segmentCacheStats.missCount.incrementAndGet();
// Either use the next cache or the 'loader'
Callable<Buffer> nextLoader = nextCache != null
? () -> nextCache.readSegment(msb, lsb, loader)
: loader;
Stopwatch stopwatch = Stopwatch.createStarted();
try {
segment = nextLoader.call();
if (segment != null) {
recordCacheLoadTimeInternal(stopwatch.elapsed(TimeUnit.NANOSECONDS), true);
writeSegment(msb, lsb, segment);
}
return segment;
} catch (RepositoryNotReachableException e) {
recordCacheLoadTimeInternal(stopwatch.elapsed(TimeUnit.NANOSECONDS), false);
// rethrow exception so that this condition can be distinguished from other types of errors (see OAK-9303)
throw e;
} catch (Exception t) {
logger.error("Exception while loading segment {} from remote store or linked cache", new UUID(msb, lsb), t);
recordCacheLoadTimeInternal(stopwatch.elapsed(TimeUnit.NANOSECONDS), false);
}
return segment;
}
/**
* Reads the segment from the cache.
* If segment is not found, this method does not query next cache that was set with {@link #linkWith(AbstractPersistentCache)}
*
* @param msb the most significant bits of the identifier of the segment
* @param lsb the least significant bits of the identifier of the segment
* @return byte buffer containing the segment data or null if the segment doesn't exist
*/
protected abstract Buffer readSegmentInternal(long msb, long lsb);
/**
* Records time spent to load data from external source, after cache miss.
*
* @param loadTime load time in nanoseconds
* @param successful indicates whether loading of the segment into cache was successful
*/
protected final void recordCacheLoadTimeInternal(long loadTime, boolean successful) {
if (successful) {
segmentCacheStats.loadSuccessCount.incrementAndGet();
} else {
segmentCacheStats.loadExceptionCount.incrementAndGet();
}
segmentCacheStats.loadTime.addAndGet(loadTime);
}
/**
* @return Statistics for this cache.
*/
@NotNull
public AbstractCacheStats getCacheStats() {
return segmentCacheStats;
}
@Override
public void close() {
try {
executor.shutdown();
if (executor.awaitTermination(60, SECONDS)) {
logger.debug("The persistent cache scheduler was successfully shut down");
} else {
logger.warn("The persistent cache scheduler takes too long to shut down");
}
} catch (InterruptedException e) {
logger.warn("Interrupt while shutting down the persistent cache scheduler", e);
currentThread().interrupt();
}
}
public int getWritesPending() {
return writesPending.size();
}
}