blob: 731884339a0e7461bde4f1480d3c0eb92d88c976 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.jackrabbit.oak.segment.remote.persistentcache;
import com.google.common.base.Stopwatch;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
public class PersistentDiskCache extends AbstractPersistentCache {
private static final Logger logger = LoggerFactory.getLogger(PersistentDiskCache.class);
public static final int DEFAULT_MAX_CACHE_SIZE_MB = 512;
public static final String NAME = "Segment Disk Cache";
private final File directory;
private final long maxCacheSizeBytes;
private final IOMonitor diskCacheIOMonitor;
final AtomicBoolean cleanupInProgress = new AtomicBoolean(false);
final AtomicLong evictionCount = new AtomicLong();
private static final Comparator<Path> sortedByAccessTime = (path1, path2) -> {
try {
FileTime lastAccessFile1 = Files.readAttributes(path1, BasicFileAttributes.class).lastAccessTime();
FileTime lastAccessFile2 = Files.readAttributes(path2, BasicFileAttributes.class).lastAccessTime();
return lastAccessFile1.compareTo(lastAccessFile2);
} catch (IOException e) {
logger.error("A problem occurred while cleaning up the cache: ", e);
}
return 0;
};
public PersistentDiskCache(File directory, int cacheMaxSizeMB, IOMonitor diskCacheIOMonitor) {
this.directory = directory;
this.maxCacheSizeBytes = cacheMaxSizeMB * 1024L * 1024L;
this.diskCacheIOMonitor = diskCacheIOMonitor;
if (!directory.exists()) {
directory.mkdirs();
}
segmentCacheStats = new SegmentCacheStats(
NAME,
() -> maxCacheSizeBytes,
() -> Long.valueOf(directory.listFiles().length),
() -> FileUtils.sizeOfDirectory(directory),
() -> evictionCount.get());
}
@Override
protected Buffer readSegmentInternal(long msb, long lsb) {
try {
String segmentId = new UUID(msb, lsb).toString();
File segmentFile = new File(directory, segmentId);
Stopwatch stopwatch = Stopwatch.createStarted();
if (segmentFile.exists()) {
diskCacheIOMonitor.beforeSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length());
try (FileInputStream fis = new FileInputStream(segmentFile); FileChannel channel = fis.getChannel()) {
int length = (int) channel.size();
Buffer buffer;
if (OFF_HEAP) {
buffer = Buffer.allocateDirect(length);
} else {
buffer = Buffer.allocate(length);
}
if (buffer.readFully(channel, 0) < length) {
throw new EOFException();
}
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
diskCacheIOMonitor.afterSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length(), elapsed);
buffer.flip();
return buffer;
} catch (FileNotFoundException e) {
logger.info("Segment {} deleted from file system!", segmentId);
} catch (IOException e) {
logger.error("Error loading segment {} from cache:", segmentId, e);
}
}
} catch (Exception e) {
logger.error("Exception while reading segment {} from the cache:", new UUID(msb, lsb), e);
}
return null;
}
@Override
public boolean containsSegment(long msb, long lsb) {
return new File(directory, new UUID(msb, lsb).toString()).exists();
}
@Override
public void writeSegment(long msb, long lsb, Buffer buffer) {
String segmentId = new UUID(msb, lsb).toString();
File segmentFile = new File(directory, segmentId);
File tempSegmentFile = new File(directory, segmentId + System.nanoTime() + ".part");
Buffer bufferCopy = buffer.duplicate();
Runnable task = () -> {
if (writesPending.add(segmentId)) {
try {
int fileSize;
try (FileChannel channel = new FileOutputStream(tempSegmentFile).getChannel()) {
fileSize = bufferCopy.write(channel);
}
try {
Files.move(tempSegmentFile.toPath(), segmentFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException e) {
Files.move(tempSegmentFile.toPath(), segmentFile.toPath());
}
cacheSize.addAndGet(fileSize);
} catch (Exception e) {
logger.error("Error writing segment {} to cache: {}", segmentId, e);
try {
Files.deleteIfExists(segmentFile.toPath());
Files.deleteIfExists(tempSegmentFile.toPath());
} catch (IOException i) {
logger.error("Error while deleting corrupted segment file {}", segmentId, i);
}
} finally {
writesPending.remove(segmentId);
}
}
cleanUp();
};
executor.execute(task);
}
private boolean isCacheFull() {
return cacheSize.get() >= maxCacheSizeBytes;
}
@Override
public void cleanUp() {
if (!cleanupInProgress.getAndSet(true)) {
try {
cleanUpInternal();
} finally {
cleanupInProgress.set(false);
}
}
}
private void cleanUpInternal() {
if (isCacheFull()) {
try {
Stream<Path> segmentsPaths = Files.walk(directory.toPath())
.sorted(sortedByAccessTime)
.filter(filePath -> !filePath.toFile().isDirectory());
StreamConsumer.forEach(segmentsPaths, (path, breaker) -> {
if (cacheSize.get() > maxCacheSizeBytes * 0.66) {
cacheSize.addAndGet(-path.toFile().length());
path.toFile().delete();
evictionCount.incrementAndGet();
} else {
breaker.stop();
}
});
} catch (IOException e) {
logger.error("A problem occurred while cleaning up the cache: ", e);
}
}
}
static class StreamConsumer {
public static class Breaker {
private boolean shouldBreak = false;
public void stop() {
shouldBreak = true;
}
boolean get() {
return shouldBreak;
}
}
public static <T> void forEach(Stream<T> stream, BiConsumer<T, Breaker> consumer) {
Spliterator<T> spliterator = stream.spliterator();
boolean hadNext = true;
Breaker breaker = new Breaker();
while (hadNext && !breaker.get()) {
hadNext = spliterator.tryAdvance(elem -> {
consumer.accept(elem, breaker);
});
}
}
}
}