blob: 4e433664d5402d9f233e739c4ff945d0572c4bcf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.segment;
import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList;
import static org.apache.jackrabbit.guava.common.collect.Sets.newHashSet;
import static org.apache.jackrabbit.guava.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
import static org.apache.jackrabbit.guava.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.lang.Integer.getInteger;
import static java.lang.String.valueOf;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
import static org.apache.jackrabbit.oak.api.Type.STRING;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.apache.jackrabbit.oak.segment.ClassicCompactor.UPDATE_LIMIT;
import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.defaultGCOptions;
import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.jackrabbit.guava.common.io.ByteStreams;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore;
import org.apache.jackrabbit.oak.plugins.memory.StringPropertyState;
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.segment.file.FileStoreGCMonitor;
import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
import org.apache.jackrabbit.oak.segment.tool.Compact;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompactionAndCleanupIT {
private static final Logger log = LoggerFactory
.getLogger(CompactionAndCleanupIT.class);
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
@BeforeClass
public static void init() {
// Allow running gc without backoff. Needed for testCancelCompactionSNFE.
// See FileStore.GC_BACKOFF.
System.setProperty("oak.gc.backoff", "0");
}
private File getFileStoreFolder() {
return folder.getRoot();
}
@Test
public void compactPersistsHead() throws Exception {
FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withGCOptions(defaultGCOptions().setRetainedGenerations(2))
.withMaxFileSize(1)
.build();
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
try {
// Create ~2MB of data
NodeBuilder extra = nodeStore.getRoot().builder();
NodeBuilder content = extra.child("content");
for (int i = 0; i < 10000; i++) {
NodeBuilder c = content.child("c" + i);
for (int j = 0; j < 1000; j++) {
c.setProperty("p" + i, "v" + i);
}
}
nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
fileStore.compactFull();
assertEquals(fileStore.getRevisions().getHead(), fileStore.getRevisions().getPersistedHead());
} finally {
fileStore.close();
}
}
@Test
public void compactionNoBinaryClone() throws Exception {
ScheduledExecutorService executor = newSingleThreadScheduledExecutor();
FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withGCOptions(defaultGCOptions().setRetainedGenerations(2))
.withStatisticsProvider(new DefaultStatisticsProvider(executor))
.withMaxFileSize(1)
.build();
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
try {
// 5MB blob
int blobSize = 5 * 1024 * 1024;
// Create ~2MB of data
NodeBuilder extra = nodeStore.getRoot().builder();
NodeBuilder content = extra.child("content");
for (int i = 0; i < 10000; i++) {
NodeBuilder c = content.child("c" + i);
for (int j = 0; j < 1000; j++) {
c.setProperty("p" + i, "v" + i);
}
}
nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
long size1 = fileStore.getStats().getApproximateSize();
log.debug("File store size {}", byteCountToDisplaySize(size1));
// Create a property with 5 MB blob
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setProperty("blob1", createBlob(nodeStore, blobSize));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
long size2 = fileStore.getStats().getApproximateSize();
assertTrue("the store should grow", size2 > size1);
assertTrue("the store should grow of at least the size of the blob", size2 - size1 >= blobSize);
// Now remove the property. No gc yet -> size doesn't shrink
builder = nodeStore.getRoot().builder();
builder.removeProperty("blob1");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
long size3 = fileStore.getStats().getApproximateSize();
assertTrue("the store should grow", size3 > size2);
// 1st gc cycle -> no reclaimable garbage...
fileStore.compactFull();
fileStore.cleanup();
long size4 = fileStore.getStats().getApproximateSize();
// Add another 5MB binary doubling the blob size
builder = nodeStore.getRoot().builder();
builder.setProperty("blob2", createBlob(nodeStore, blobSize));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
long size5 = fileStore.getStats().getApproximateSize();
assertTrue("the store should grow", size5 > size4);
assertTrue("the store should grow of at least the size of the blob", size5 - size4 >= blobSize);
// 2st gc cycle -> 1st blob should get collected
fileStore.compactFull();
fileStore.cleanup();
long size6 = fileStore.getStats().getApproximateSize();
assertTrue("the store should shrink", size6 < size5);
assertTrue("the store should shrink of at least the size of the blob", size5 - size6 >= blobSize);
// 3rtd gc cycle -> no significant change
fileStore.compactFull();
fileStore.cleanup();
long size7 = fileStore.getStats().getApproximateSize();
// No data loss
byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot()
.getProperty("blob2").getValue(Type.BINARY).getNewStream());
assertEquals(blobSize, blob.length);
} finally {
fileStore.close();
}
}
@Test
public void offlineCompaction() throws Exception {
SegmentGCOptions gcOptions = defaultGCOptions().setOffline();
ScheduledExecutorService executor = newSingleThreadScheduledExecutor();
FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withMaxFileSize(1)
.withGCOptions(gcOptions)
.withStatisticsProvider(new DefaultStatisticsProvider(executor))
.build();
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
try {
// 5MB blob
int blobSize = 5 * 1024 * 1024;
// Create ~2MB of data
NodeBuilder extra = nodeStore.getRoot().builder();
NodeBuilder content = extra.child("content");
for (int i = 0; i < 10000; i++) {
NodeBuilder c = content.child("c" + i);
for (int j = 0; j < 1000; j++) {
c.setProperty("p" + i, "v" + i);
}
}
nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
long size1 = fileStore.getStats().getApproximateSize();
log.debug("File store size {}", byteCountToDisplaySize(size1));
// Create a property with 5 MB blob
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setProperty("blob1", createBlob(nodeStore, blobSize));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
long size2 = fileStore.getStats().getApproximateSize();
assertTrue("the store should grow", size2 > size1);
assertTrue("the store should grow of at least the size of the blob", size2 - size1 > blobSize);
// Now remove the property. No gc yet -> size doesn't shrink
builder = nodeStore.getRoot().builder();
builder.removeProperty("blob1");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
long size3 = fileStore.getStats().getApproximateSize();
assertTrue("the size should grow", size3 > size2);
// 1st gc cycle -> 1st blob should get collected
fileStore.compactFull();
fileStore.cleanup();
long size4 = fileStore.getStats().getApproximateSize();
assertTrue("the store should shrink", size4 < size3);
assertTrue("the store should shrink of at least the size of the blob", size3 - size4 >= blobSize);
// Add another 5MB binary
builder = nodeStore.getRoot().builder();
builder.setProperty("blob2", createBlob(nodeStore, blobSize));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
long size5 = fileStore.getStats().getApproximateSize();
assertTrue("the store should grow", size5 > size4);
assertTrue("the store should grow of at least the size of the blob", size5 - size4 > blobSize);
// 2st gc cycle -> 2nd blob should *not* be collected
fileStore.compactFull();
fileStore.cleanup();
long size6 = fileStore.getStats().getApproximateSize();
assertTrue("the blob should not be collected", size6 > blobSize);
// 3rd gc cycle -> no significant change
fileStore.compactFull();
fileStore.cleanup();
long size7 = fileStore.getStats().getApproximateSize();
assertTrue("the blob should not be collected", size7 > blobSize);
// No data loss
byte[] blob = ByteStreams.toByteArray(nodeStore.getRoot()
.getProperty("blob2").getValue(Type.BINARY).getNewStream());
assertEquals(blobSize, blob.length);
} finally {
fileStore.close();
}
}
@Test
public void cancelOfflineCompaction() throws Exception {
final AtomicBoolean cancelCompaction = new AtomicBoolean(true);
try (FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withGCOptions(defaultGCOptions().setOffline())
.build()) {
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
// Create ~2MB of data
NodeBuilder extra = nodeStore.getRoot().builder();
NodeBuilder content = extra.child("content");
for (int i = 0; i < 10000; i++) {
NodeBuilder c = content.child("c" + i);
for (int j = 0; j < 1000; j++) {
c.setProperty("p" + i, "v" + i);
}
}
nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
NodeState uncompactedRoot = nodeStore.getRoot();
// Keep cancelling compaction
new Thread(() -> {
while (cancelCompaction.get()) {
fileStore.cancelGC();
}
}).start();
fileStore.compactFull();
// Cancelling compaction must not corrupt the repository. See OAK-7050.
NodeState compactedRoot = nodeStore.getRoot();
assertTrue(compactedRoot.exists());
assertEquals(uncompactedRoot, compactedRoot);
} finally {
cancelCompaction.set(false);
}
}
/**
* Create a lot of data nodes (no binaries) and a few checkpoints, verify
* that compacting checkpoints will not cause the size to explode
*/
@Test
public void offlineCompactionCps() throws Exception {
SegmentGCOptions gcOptions = defaultGCOptions().setOffline();
ScheduledExecutorService executor = newSingleThreadScheduledExecutor();
FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withMaxFileSize(1)
.withGCOptions(gcOptions)
.withStatisticsProvider(new DefaultStatisticsProvider(executor))
.build();
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
try {
// Create ~2MB of data
NodeBuilder extra = nodeStore.getRoot().builder();
NodeBuilder content = extra.child("content");
for (int i = 0; i < 10000; i++) {
NodeBuilder c = content.child("c" + i);
for (int j = 0; j < 1000; j++) {
c.setProperty("p" + i, "v" + i);
}
}
nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
fileStore.compactFull();
fileStore.cleanup();
// Compacts to 548Kb
long size0 = fileStore.getStats().getApproximateSize();
int cpNo = 4;
Set<String> cps = new HashSet<String>();
for (int i = 0; i < cpNo; i++) {
cps.add(nodeStore.checkpoint(60000));
}
assertEquals(cpNo, cps.size());
for (String cp : cps) {
assertTrue(nodeStore.retrieve(cp) != null);
}
long size1 = fileStore.getStats().getApproximateSize();
assertTrue("the size should grow or stay the same", size1 >= size0);
// TODO the following assertion doesn't say anything useful. The
// conveyed message is "the repository can shrink, grow or stay the
// same, as long as it remains in a 10% margin of the previous size
// that I took out of thin air". It has to be fixed or removed.
// fileStore.compact();
// fileStore.cleanup();
// long size2 = fileStore.getStats().getApproximateSize();
// assertSize("with checkpoints compacted", size2, size1 * 9/10, size1 * 11 / 10);
} finally {
fileStore.close();
}
}
@Test
public void equalContentAfterOC() throws Exception {
SegmentGCOptions gcOptions = defaultGCOptions().setOffline();
ScheduledExecutorService executor = newSingleThreadScheduledExecutor();
try (FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withGCOptions(gcOptions)
.build()) {
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
// Add initial content
NodeBuilder rootBuilder = nodeStore.getRoot().builder();
addNodes(rootBuilder, 8, "p");
addProperties(rootBuilder, 3);
nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
NodeState initialRoot = nodeStore.getRoot();
assertTrue(fileStore.compactFull());
NodeState compactedRoot = nodeStore.getRoot();
assertTrue(initialRoot != compactedRoot);
assertEquals(initialRoot, compactedRoot);
}
}
/**
* Create 2 binary nodes with same content and same reference. Verify
* de-duplication capabilities of compaction
*/
@Test
public void offlineCompactionBinR1() throws Exception {
SegmentGCOptions gcOptions = defaultGCOptions().setOffline();
ScheduledExecutorService executor = newSingleThreadScheduledExecutor();
FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withMaxFileSize(1)
.withGCOptions(gcOptions)
.withStatisticsProvider(new DefaultStatisticsProvider(executor))
.build();
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders
.builder(fileStore).build();
try {
NodeBuilder extra = nodeStore.getRoot().builder();
NodeBuilder content = extra.child("content");
int blobSize = 5 * 1024 * 1024;
byte[] data = new byte[blobSize];
new Random().nextBytes(data);
Blob b = nodeStore.createBlob(new ByteArrayInputStream(data));
NodeBuilder c1 = content.child("c1");
c1.setProperty("blob1", b);
NodeBuilder c2 = content.child("c2");
c2.setProperty("blob2", b);
nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
int cpNo = 4;
Set<String> cps = new HashSet<String>();
for (int i = 0; i < cpNo; i++) {
cps.add(nodeStore.checkpoint(60000));
}
assertEquals(cpNo, cps.size());
for (String cp : cps) {
assertTrue(nodeStore.retrieve(cp) != null);
}
// 5Mb, de-duplication by the SegmentWriter
long size1 = fileStore.getStats().getApproximateSize();
fileStore.compactFull();
fileStore.cleanup();
long size2 = fileStore.getStats().getApproximateSize();
assertSize("with compacted binaries", size2, 0, size1 * 11 / 10);
} finally {
fileStore.close();
}
}
/**
* Test for the Offline compaction tool (OAK-5971)
*/
@Test
public void offlineCompactionTool() throws Exception {
SegmentGCOptions gcOptions = defaultGCOptions().setOffline();
ScheduledExecutorService executor = newSingleThreadScheduledExecutor();
FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withMaxFileSize(1)
.withGCOptions(gcOptions)
.withStatisticsProvider(new DefaultStatisticsProvider(executor))
.build();
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
try {
NodeBuilder root = nodeStore.getRoot().builder();
root.child("content");
nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
} finally {
fileStore.close();
}
Compact.builder().withPath(getFileStoreFolder()).build().run();
fileStore = fileStoreBuilder(getFileStoreFolder())
.withMaxFileSize(1)
.withGCOptions(gcOptions)
.withStatisticsProvider(new DefaultStatisticsProvider(executor))
.build();
nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
try {
assertTrue(nodeStore.getRoot().hasChildNode("content"));
} finally {
fileStore.close();
}
}
private static void assertSize(String info, long size, long lower, long upper) {
log.debug("File Store {} size {}, expected in interval [{},{}]",
info, size, lower, upper);
assertTrue("File Store " + info + " size expected in interval " +
"[" + (lower) + "," + (upper) + "] but was: " + (size),
size >= lower && size <= (upper));
}
private static Blob createBlob(NodeStore nodeStore, int size) throws IOException {
byte[] data = new byte[size];
new Random().nextBytes(data);
return nodeStore.createBlob(new ByteArrayInputStream(data));
}
@Test
public void testCancelCompaction()
throws Throwable {
final FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withGCOptions(defaultGCOptions().setRetainedGenerations(2))
.withMaxFileSize(1)
.build();
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
NodeBuilder builder = nodeStore.getRoot().builder();
addNodes(builder, 10, "");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
FutureTask<Boolean> async = runAsync(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
boolean cancelled = false;
for (int k = 0; !cancelled && k < 1000; k++) {
cancelled = !fileStore.compactFull();
}
return cancelled;
}
});
// Give the compaction thread a head start
sleepUninterruptibly(1, SECONDS);
fileStore.close();
try {
assertTrue(async.get());
} catch (ExecutionException e) {
if (!(e.getCause() instanceof IllegalStateException)) {
// Throw cause unless this is an ISE thrown by the
// store being already closed, which is kinda expected
throw e.getCause();
}
}
}
/**
* See OAK-5517: SNFE when running compaction after a cancelled gc
*/
@Test
public void testCancelCompactionSNFE()
throws Throwable {
final FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withGCOptions(defaultGCOptions()
.setRetainedGenerations(2)
.setEstimationDisabled(true))
.withMaxFileSize(1)
.build();
try {
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
final Callable<Void> cancel = new Callable<Void>() {
@Override
public Void call() throws Exception {
// Give the compaction thread a head start
sleepUninterruptibly(1000, MILLISECONDS);
fileStore.cancelGC();
return null;
}
};
for (int k = 0; k < 100; k++) {
NodeBuilder builder = nodeStore.getRoot().builder();
addNodes(builder, 10, k + "-");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
// Cancelling gc should not cause a SNFE on subsequent gc runs
runAsync(cancel);
fileStore.fullGC();
}
} finally {
fileStore.close();
}
}
private static void addNodes(NodeBuilder builder, int depth, String prefix) {
if (depth > 0) {
NodeBuilder child1 = builder.setChildNode(prefix + "1");
addNodes(child1, depth - 1, prefix);
NodeBuilder child2 = builder.setChildNode(prefix + "2");
addNodes(child2, depth - 1, prefix);
}
}
private static void addProperties(NodeBuilder builder, int count) {
for (int c = 0; c < count; c++) {
builder.setProperty("p-" + c, "v-" + c);
}
for (String child : builder.getChildNodeNames()) {
addProperties(builder.getChildNode(child), count);
}
}
/**
* Regression test for OAK-2192 testing for mixed segments. This test does not
* cover OAK-3348. I.e. it does not assert the segment graph is free of cross
* gc generation references.
*/
@Test
public void testMixedSegments() throws Exception {
FileStore store = fileStoreBuilder(getFileStoreFolder())
.withMaxFileSize(2)
.withMemoryMapping(true)
.build();
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(store).build();
AtomicBoolean compactionSuccess = new AtomicBoolean(true);
NodeBuilder root = nodeStore.getRoot().builder();
createNodes(root.setChildNode("test"), 10, 3);
nodeStore.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
Set<UUID> beforeSegments = new HashSet<UUID>();
collectSegments(store.getReader(), store.getHead().getRecordId(),
segmentId -> beforeSegments.add(segmentId.asUUID()));
final AtomicReference<Boolean> run = new AtomicReference<Boolean>(true);
final List<Exception> failedCommits = newArrayList();
Thread[] threads = new Thread[10];
for (int k = 0; k < threads.length; k++) {
final int threadId = k;
threads[k] = new Thread(() -> {
for (int j = 0; run.get(); j++) {
String nodeName = "b-" + threadId + "," + j;
try {
NodeBuilder changes = nodeStore.getRoot().builder();
changes.setChildNode(nodeName);
nodeStore.merge(changes, EmptyHook.INSTANCE, CommitInfo.EMPTY);
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.interrupted();
break;
} catch (Exception e) {
failedCommits.add(new ExecutionException("Failed commit " + nodeName, e));
}
}
});
threads[k].start();
}
store.compactFull();
run.set(false);
for (Thread t : threads) {
t.join();
}
store.flush();
assumeTrue("Failed to acquire compaction lock", compactionSuccess.get());
for (Exception failedCommit : failedCommits) {
throw new Exception("A background commit failed", failedCommit);
}
Set<UUID> afterSegments = new HashSet<UUID>();
collectSegments(store.getReader(), store.getHead().getRecordId(),
segmentId -> afterSegments.add(segmentId.asUUID()));
try {
for (UUID u : beforeSegments) {
assertFalse("Mixed segments found: " + u, afterSegments.contains(u));
}
} finally {
store.close();
}
}
@Test
public void testMixedSegmentsGCGeneration() throws Exception {
try (FileStore store = fileStoreBuilder(getFileStoreFolder())
.withMaxFileSize(2)
.withMemoryMapping(true)
.build()) {
CountDownLatch readyToCompact = new CountDownLatch(1);
CountDownLatch compactionCompleted = new CountDownLatch(1);
SegmentNodeBuilder changes = store.getHead().builder();
changes.setProperty("a", "a");
changes.setProperty(new StringPropertyState("b", "b") {
@Override
public String getValue() {
readyToCompact.countDown();
awaitUninterruptibly(compactionCompleted);
return super.getValue();
}
});
// Overlap an ongoing write operation triggered by the call to getNodeState
// with a full compaction. This should not cause the written node state
// to reference segments from multiple generations
FutureTask<SegmentNodeState> futureNodeState = runAsync(changes::getNodeState);
readyToCompact.await();
store.compactFull();
compactionCompleted.countDown();
// The node state from the write operation that started before
// compaction completed should reference only segments from generation 0.
SegmentNodeState gen0NodeState = futureNodeState.get();
collectSegments(store.getReader(), gen0NodeState.getRecordId(),
segmentId -> assertEquals("Full generation should be 0",
0, segmentId.getGcGeneration().getFullGeneration()));
// Retrieving the node state again should trigger a rewriting to
// the next generation (1).
SegmentNodeState gen1NodeState = changes.getNodeState();
collectSegments(store.getReader(), gen1NodeState.getRecordId(),
segmentId -> assertEquals("Full generation should be 1",
1, segmentId.getGcGeneration().getFullGeneration()));
}
}
/**
* Set a root node referring to a child node that lives in a different segments. Depending
* on the order how the SegmentBufferWriters associated with the threads used to create the
* nodes are flushed, this will introduce a forward reference between the segments.
* The current cleanup mechanism cannot handle forward references and removes the referenced
* segment causing a SNFE.
* This is a regression introduced with OAK-1828.
*/
@Test
public void cleanupCyclicGraph() throws Exception {
FileStore fileStore = fileStoreBuilder(getFileStoreFolder()).build();
final SegmentReader reader = fileStore.getReader();
final SegmentWriter writer = fileStore.getWriter();
final BlobStore blobStore = fileStore.getBlobStore();
final SegmentNodeState oldHead = fileStore.getHead();
final SegmentNodeState child = run(new Callable<SegmentNodeState>() {
@Override
public SegmentNodeState call() throws Exception {
NodeBuilder builder = EMPTY_NODE.builder();
return new SegmentNodeState(reader, writer, blobStore, writer.writeNode(EMPTY_NODE));
}
});
SegmentNodeState newHead = run(new Callable<SegmentNodeState>() {
@Override
public SegmentNodeState call() throws Exception {
NodeBuilder builder = oldHead.builder();
builder.setChildNode("child", child);
return new SegmentNodeState(reader, writer, blobStore, writer.writeNode(builder.getNodeState()));
}
});
writer.flush();
fileStore.getRevisions().setHead(oldHead.getRecordId(), newHead.getRecordId());
fileStore.close();
fileStore = fileStoreBuilder(getFileStoreFolder()).build();
traverse(fileStore.getHead());
fileStore.cleanup();
// Traversal after cleanup might result in an SNFE
traverse(fileStore.getHead());
fileStore.close();
}
private static void traverse(NodeState node) {
for (ChildNodeEntry childNodeEntry : node.getChildNodeEntries()) {
traverse(childNodeEntry.getNodeState());
}
}
private static <T> T run(Callable<T> callable) throws InterruptedException, ExecutionException {
FutureTask<T> task = new FutureTask<T>(callable);
new Thread(task).start();
return task.get();
}
private static <T> FutureTask<T> runAsync(Callable<T> callable) {
FutureTask<T> task = new FutureTask<T>(callable);
new Thread(task).start();
return task;
}
/**
* Test asserting OAK-3348: Cross gc sessions might introduce references to pre-compacted segments
*/
@Test
public void preCompactionReferences() throws Exception {
for (String ref : new String[] {"merge-before-compact", "merge-after-compact"}) {
File repoDir = new File(getFileStoreFolder(), ref);
FileStore fileStore = fileStoreBuilder(repoDir)
.withMaxFileSize(2)
.withGCOptions(defaultGCOptions())
.build();
final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
try {
// add some content
NodeBuilder preGCBuilder = nodeStore.getRoot().builder();
preGCBuilder.setChildNode("test").setProperty("blob", createBlob(nodeStore, 1024 * 1024));
nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
// remove it again so we have something to gc
preGCBuilder = nodeStore.getRoot().builder();
preGCBuilder.getChildNode("test").remove();
nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
// with a new builder simulate exceeding the update limit.
// This will cause changes to be pre-written to segments
preGCBuilder = nodeStore.getRoot().builder();
preGCBuilder.setChildNode("test").setChildNode("a").setChildNode("b").setProperty("foo", "bar");
for (int k = 0; k < getInteger("update.limit", 10000); k += 2) {
preGCBuilder.setChildNode("dummy").remove();
}
// case 1: merge above changes before compact
if ("merge-before-compact".equals(ref)) {
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setChildNode("n");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
// Ensure cleanup is efficient by surpassing the number of
// retained generations
for (int k = 0; k < defaultGCOptions().getRetainedGenerations(); k++) {
fileStore.compactFull();
}
// case 2: merge above changes after compact
if ("merge-after-compact".equals(ref)) {
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setChildNode("n");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
nodeStore.merge(preGCBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
} finally {
fileStore.close();
}
// Re-initialise the file store to simulate off-line gc
fileStore = fileStoreBuilder(repoDir).withMaxFileSize(2).build();
try {
// The 1M blob should get gc-ed
fileStore.cleanup();
assertTrue(ref + " repository size " + fileStore.getStats().getApproximateSize() + " < " + 1024 * 1024,
fileStore.getStats().getApproximateSize() < 1024 * 1024);
} finally {
fileStore.close();
}
}
}
private static void collectSegments(SegmentReader reader, RecordId headId, Consumer<SegmentId> onSegment) {
new SegmentParser(reader) {
@Override
protected void onNode(RecordId parentId, RecordId nodeId) {
super.onNode(parentId, nodeId);
onSegment.accept(nodeId.getSegmentId());
}
@Override
protected void onTemplate(RecordId parentId, RecordId templateId) {
super.onTemplate(parentId, templateId);
onSegment.accept(templateId.getSegmentId());
}
@Override
protected void onMap(RecordId parentId, RecordId mapId, MapRecord map) {
super.onMap(parentId, mapId, map);
onSegment.accept(mapId.getSegmentId());
}
@Override
protected void onMapDiff(RecordId parentId, RecordId mapId, MapRecord map) {
super.onMapDiff(parentId, mapId, map);
onSegment.accept(mapId.getSegmentId());
}
@Override
protected void onMapLeaf(RecordId parentId, RecordId mapId, MapRecord map) {
super.onMapLeaf(parentId, mapId, map);
onSegment.accept(mapId.getSegmentId());
}
@Override
protected void onMapBranch(RecordId parentId, RecordId mapId, MapRecord map) {
super.onMapBranch(parentId, mapId, map);
onSegment.accept(mapId.getSegmentId());
}
@Override
protected void onProperty(RecordId parentId, RecordId propertyId, PropertyTemplate template) {
super.onProperty(parentId, propertyId, template);
onSegment.accept(propertyId.getSegmentId());
}
@Override
protected void onValue(RecordId parentId, RecordId valueId, Type<?> type) {
super.onValue(parentId, valueId, type);
onSegment.accept(valueId.getSegmentId());
}
@Override
protected void onBlob(RecordId parentId, RecordId blobId) {
super.onBlob(parentId, blobId);
onSegment.accept(blobId.getSegmentId());
}
@Override
protected void onString(RecordId parentId, RecordId stringId) {
super.onString(parentId, stringId);
onSegment.accept(stringId.getSegmentId());
}
@Override
protected void onList(RecordId parentId, RecordId listId, int count) {
super.onList(parentId, listId, count);
onSegment.accept(listId.getSegmentId());
}
@Override
protected void onListBucket(RecordId parentId, RecordId listId, int index, int count, int capacity) {
super.onListBucket(parentId, listId, index, count, capacity);
onSegment.accept(listId.getSegmentId());
}
}.parseNode(headId);
}
private static void createNodes(NodeBuilder builder, int count, int depth) {
if (depth > 0) {
for (int k = 0; k < count; k++) {
NodeBuilder child = builder.setChildNode("node" + k);
createProperties(child, count);
createNodes(child, count, depth - 1);
}
}
}
private static void createProperties(NodeBuilder builder, int count) {
for (int k = 0; k < count; k++) {
builder.setProperty("property-" + UUID.randomUUID().toString(), "value-" + UUID.randomUUID().toString());
}
}
@Test
public void propertyRetention() throws Exception {
SegmentGCOptions gcOptions = defaultGCOptions();
FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withMaxFileSize(1)
.withGCOptions(gcOptions)
.build();
try {
final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
// Add a property
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setChildNode("test").setProperty("property", "value");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
// Segment id of the current segment
NodeState test = nodeStore.getRoot().getChildNode("test");
SegmentId id = ((SegmentNodeState) test).getRecordId().getSegmentId();
fileStore.flush();
assertTrue(fileStore.containsSegment(id));
// Add enough content to fill up the current tar file
builder = nodeStore.getRoot().builder();
addContent(builder.setChildNode("dump"));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
// Segment and property still there
assertTrue(fileStore.containsSegment(id));
PropertyState property = test.getProperty("property");
assertEquals("value", property.getValue(STRING));
// GC should remove the segment
fileStore.flush();
// Ensure cleanup is efficient by surpassing the number of
// retained generations
for (int k = 0; k < gcOptions.getRetainedGenerations(); k++) {
fileStore.compactFull();
}
fileStore.cleanup();
try {
fileStore.readSegment(id);
fail("Segment " + id + " should be gc'ed");
} catch (SegmentNotFoundException ignore) {}
} finally {
fileStore.close();
}
}
@Test
public void checkpointDeduplicationTest() throws Exception {
class CP {
String id;
NodeState uncompacted;
NodeState compacted;
}
CP[] cps = {new CP(), new CP(), new CP(), new CP()};
try (FileStore fileStore = fileStoreBuilder(getFileStoreFolder()).build()) {
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
// Initial content and checkpoint
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setChildNode("a").setChildNode("aa");
builder.setChildNode("b").setChildNode("bb");
builder.setChildNode("c").setChildNode("cc");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
cps[0].id = nodeStore.checkpoint(Long.MAX_VALUE);
// Add content and another checkpoint
builder = nodeStore.getRoot().builder();
builder.setChildNode("1").setChildNode("11");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
cps[1].id = nodeStore.checkpoint(Long.MAX_VALUE);
// Modify content and another checkpoint
builder = nodeStore.getRoot().builder();
builder.getChildNode("a").getChildNode("aa").setChildNode("aaa");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
cps[2].id = nodeStore.checkpoint(Long.MAX_VALUE);
// Remove content and another checkpoint
builder = nodeStore.getRoot().builder();
builder.getChildNode("a").remove();
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
cps[3].id = nodeStore.checkpoint(Long.MAX_VALUE);
// A final bit of content
builder = nodeStore.getRoot().builder();
builder.setChildNode("d").setChildNode("dd");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
NodeState uncompactedSuperRoot = fileStore.getHead();
NodeState uncompactedRoot = nodeStore.getRoot();
for (CP cp : cps) {
cp.uncompacted = nodeStore.retrieve(cp.id);
}
fileStore.compactFull();
NodeState compactedSuperRoot = fileStore.getHead();
NodeState compactedRoot = nodeStore.getRoot();
for (CP cp : cps) {
cp.compacted = nodeStore.retrieve(cp.id);
}
assertEquals(uncompactedSuperRoot, compactedSuperRoot);
assertEquals(uncompactedRoot, compactedRoot);
assertStableIds(uncompactedRoot, compactedRoot, "/root");
for (CP cp : cps) {
assertEquals(cp.uncompacted, cp.compacted);
assertStableIds(cp.uncompacted, cp.compacted, concat("/root/checkpoints", cp.id));
}
}
}
@Test
public void keepStableIdOnFlush() throws Exception {
try (FileStore fileStore = fileStoreBuilder(getFileStoreFolder()).build()) {
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
// Initial content and checkpoint
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setChildNode("a");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
nodeStore.checkpoint(Long.MAX_VALUE);
// A final bit of content
builder = nodeStore.getRoot().builder();
for (int k = 0; k < UPDATE_LIMIT; k++) {
builder.setChildNode("b-" + k);
}
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
NodeState uncompactedSuperRoot = fileStore.getHead();
NodeState uncompactedRoot = nodeStore.getRoot();
fileStore.compactFull();
NodeState compactedSuperRoot = fileStore.getHead();
NodeState compactedRoot = nodeStore.getRoot();
assertEquals(uncompactedSuperRoot, compactedSuperRoot);
assertEquals(uncompactedRoot, compactedRoot);
assertStableIds(uncompactedRoot, compactedRoot, "/root");
}
}
@Test
public void crossGCDeduplicationTest() throws Exception {
try (FileStore fileStore = fileStoreBuilder(getFileStoreFolder()).build()) {
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setChildNode("a").setChildNode("aa");
builder.setChildNode("b").setChildNode("bb");
builder.setChildNode("c").setChildNode("cc");
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
NodeState a = nodeStore.getRoot().getChildNode("a");
builder = nodeStore.getRoot().builder();
builder.setChildNode("x").setChildNode("xx");
SegmentNodeState uncompacted = (SegmentNodeState) nodeStore.getRoot();
fileStore.compactFull();
NodeState compacted = nodeStore.getRoot();
assertEquals(uncompacted, compacted);
assertStableIds(uncompacted, compacted, "/root");
builder.setChildNode("y").setChildNode("yy");
builder.getChildNode("a").remove();
NodeState deferCompacted = nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
assertEquals(
uncompacted.getSegment().getGcGeneration().nextFull().nonGC(),
((SegmentNodeState)deferCompacted).getSegment().getGcGeneration());
}
}
private static void assertStableIds(NodeState node1, NodeState node2, String path) {
assertFalse("Nodes should be equal: " + path, node1 == node2);
assertTrue("Node should be a SegmentNodeState " + path, node1 instanceof SegmentNodeState);
assertTrue("Node should be a SegmentNodeState " + path, node2 instanceof SegmentNodeState);
SegmentNodeState sns1 = (SegmentNodeState) node1;
SegmentNodeState sns2 = (SegmentNodeState) node2;
assertEquals("GC generation should be bumped by one " + path,
sns1.getSegment().getGcGeneration().nextFull(), sns2.getSegment().getGcGeneration());
assertEquals("Nodes should have same stable id: " + path,
sns1.getStableId(), sns2.getStableId());
for (ChildNodeEntry cne : node1.getChildNodeEntries()) {
assertStableIds(
cne.getNodeState(), node2.getChildNode(cne.getName()),
concat(path, cne.getName()));
}
}
/**
* Test asserting OAK-4669: No new generation of tar should be created when the segments are the same
* and when various indices are created.
*/
@Test
public void concurrentWritesCleanupNoNewGen() throws Exception {
ScheduledExecutorService scheduler = newSingleThreadScheduledExecutor();
StatisticsProvider statsProvider = new DefaultStatisticsProvider(scheduler);
final FileStoreGCMonitor fileStoreGCMonitor = new FileStoreGCMonitor(Clock.SIMPLE);
File fileStoreFolder = getFileStoreFolder();
final FileStore fileStore = fileStoreBuilder(fileStoreFolder)
.withGCOptions(defaultGCOptions().setRetainedGenerations(2))
.withGCMonitor(fileStoreGCMonitor)
.withStatisticsProvider(statsProvider)
.withMaxFileSize(1)
.withMemoryMapping(false)
.build();
final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
ExecutorService executorService = newFixedThreadPool(5);
final AtomicInteger counter = new AtomicInteger();
try {
Callable<Void> concurrentWriteTask = new Callable<Void>() {
@Override
public Void call() throws Exception {
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setProperty("blob-" + counter.getAndIncrement(), createBlob(nodeStore, 512 * 512));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
return null;
}
};
List<Future<?>> results = newArrayList();
for (int i = 0; i < 5; i++) {
results.add(executorService.submit(concurrentWriteTask));
}
for (Future<?> result : results) {
assertNull(result.get());
}
fileStore.cleanup();
for (String fileName : fileStoreFolder.list()) {
if (fileName.endsWith(".tar")) {
int pos = fileName.length() - "a.tar".length();
char generation = fileName.charAt(pos);
assertTrue("Expected generation is 'a', but instead was: '" + generation + "' for file " + fileName,
generation == 'a');
}
}
} finally {
new ExecutorCloser(executorService).close();
fileStore.close();
new ExecutorCloser(scheduler).close();
}
}
@Test
public void concurrentWritesCleanupZeroReclaimedSize() throws Exception {
ScheduledExecutorService scheduler = newSingleThreadScheduledExecutor();
StatisticsProvider statsProvider = new DefaultStatisticsProvider(scheduler);
final FileStoreGCMonitor fileStoreGCMonitor = new FileStoreGCMonitor(Clock.SIMPLE);
final FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withGCOptions(defaultGCOptions().setRetainedGenerations(2))
.withGCMonitor(fileStoreGCMonitor)
.withStatisticsProvider(statsProvider)
.withMaxFileSize(1)
.withMemoryMapping(false)
.build();
final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
ExecutorService executorService = newFixedThreadPool(100);
final AtomicInteger counter = new AtomicInteger();
try {
Callable<Void> concurrentWriteTask = new Callable<Void>() {
@Override
public Void call() throws Exception {
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setProperty("blob-" + counter.getAndIncrement(), createBlob(nodeStore, 25 * 25));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
return null;
}
};
List<Future<?>> results = newArrayList();
for (int i = 0; i < 100; i++) {
results.add(executorService.submit(concurrentWriteTask));
}
Thread.sleep(100);
fileStore.cleanup();
for (Future<?> result : results) {
assertNull(result.get());
}
long reclaimedSize = fileStoreGCMonitor.getLastReclaimedSize();
assertEquals("Reclaimed size expected is 0, but instead was: " + reclaimedSize,
0, reclaimedSize);
} finally {
new ExecutorCloser(executorService).close();
fileStore.close();
new ExecutorCloser(scheduler).close();
}
}
@Test
public void randomAccessFileConcurrentReadAndLength() throws Exception {
final FileStore fileStore = fileStoreBuilder(getFileStoreFolder())
.withGCOptions(defaultGCOptions().setRetainedGenerations(2))
.withMaxFileSize(1)
.withMemoryMapping(false)
.build();
final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
ExecutorService executorService = newFixedThreadPool(300);
final AtomicInteger counter = new AtomicInteger();
try {
Callable<Void> concurrentWriteTask = new Callable<Void>() {
@Override
public Void call() throws Exception {
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setProperty("blob-" + counter.getAndIncrement(), createBlob(nodeStore, 25 * 25));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
return null;
}
};
Callable<Void> concurrentCleanupTask = new Callable<Void>() {
@Override
public Void call() throws Exception {
fileStore.cleanup();
return null;
}
};
Callable<Void> concurrentReferenceCollector = new Callable<Void>() {
@Override
public Void call() throws Exception {
fileStore.collectBlobReferences(s -> {
// Do nothing.
});
return null;
}
};
List<Future<?>> results = newArrayList();
for (int i = 0; i < 100; i++) {
results.add(executorService.submit(concurrentWriteTask));
results.add(executorService.submit(concurrentCleanupTask));
results.add(executorService.submit(concurrentReferenceCollector));
}
for (Future<?> result : results) {
assertNull(result.get());
}
} finally {
new ExecutorCloser(executorService).close();
fileStore.close();
}
}
/**
* Test asserting OAK-4700: Concurrent cleanup must not remove segments that are still reachable
*/
@Test
public void concurrentCleanup() throws Exception {
File fileStoreFolder = getFileStoreFolder();
final FileStore fileStore = fileStoreBuilder(fileStoreFolder)
.withMaxFileSize(1)
.build();
final SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
ExecutorService executorService = newFixedThreadPool(50);
final AtomicInteger counter = new AtomicInteger();
try {
Callable<Void> concurrentWriteTask = new Callable<Void>() {
@Override
public Void call() throws Exception {
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setProperty("blob-" + counter.getAndIncrement(), createBlob(nodeStore, 512 * 512));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
return null;
}
};
final Callable<Void> concurrentCleanTask = new Callable<Void>() {
@Override
public Void call() throws Exception {
fileStore.cleanup();
return null;
}
};
List<Future<?>> results = newArrayList();
for (int i = 0; i < 50; i++) {
if (i % 2 == 0) {
results.add(executorService.submit(concurrentWriteTask));
} else {
results.add(executorService.submit(concurrentCleanTask));
}
}
for (Future<?> result : results) {
assertNull(result.get());
}
for (String fileName : fileStoreFolder.list()) {
if (fileName.endsWith(".tar")) {
int pos = fileName.length() - "a.tar".length();
char generation = fileName.charAt(pos);
assertEquals("Expected nothing to be cleaned but generation '" + generation +
"' for file " + fileName + " indicates otherwise.",
"a", valueOf(generation));
}
}
} finally {
new ExecutorCloser(executorService).close();
fileStore.close();
}
}
private static void addContent(NodeBuilder builder) {
for (int k = 0; k < 10000; k++) {
builder.setProperty(UUID.randomUUID().toString(), UUID.randomUUID().toString());
}
}
private static BlobStore newBlobStore(File directory) {
OakFileDataStore delegate = new OakFileDataStore();
delegate.setPath(directory.getAbsolutePath());
delegate.init(null);
return new DataStoreBlobStore(delegate);
}
@Test
public void binaryRetentionWithDS()
throws IOException, InvalidFileStoreVersionException, CommitFailedException {
try (FileStore fileStore = fileStoreBuilder(new File(getFileStoreFolder(), "segmentstore"))
.withBlobStore(newBlobStore(new File(getFileStoreFolder(), "blobstore")))
.withGCOptions(defaultGCOptions().setGcSizeDeltaEstimation(0))
.build())
{
SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
NodeBuilder builder = nodeStore.getRoot().builder();
builder.setProperty("bin", createBlob(nodeStore, 1000000));
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
fileStore.flush();
Set<String> expectedReferences = newHashSet();
fileStore.collectBlobReferences(expectedReferences::add);
for(int k = 1; k <= 3; k++) {
fileStore.fullGC();
Set<String> actualReferences = newHashSet();
fileStore.collectBlobReferences(actualReferences::add);
assertEquals("Binary should be retained after " + k + "-th gc cycle",
expectedReferences, actualReferences);
}
}
}
@Test
public void latestFullCompactedStateShouldNotBeDeleted() throws Exception {
SegmentGCOptions gcOptions = defaultGCOptions()
.setEstimationDisabled(true)
.setRetainedGenerations(2);
try (FileStore fileStore = fileStoreBuilder(getFileStoreFolder()).withGCOptions(gcOptions).build()) {
SegmentNodeState previousHead;
SegmentNodeState head = fileStore.getHead();
// Create a full, self consistent head state. This state will be the
// base for the following tail compactions. This increments the full generation.
fileStore.fullGC();
previousHead = head;
head = fileStore.getHead();
// retainedGeneration = 2 -> the full compacted head and the previous uncompacted head must
// still be available.
traverse(previousHead);
traverse(head);
// Create a tail head state on top of the previous full state. This
// increments the generation, but leaves the full generation untouched.
fileStore.tailGC();
previousHead = head;
head = fileStore.getHead();
// retainedGeneration = 2 -> the tail compacted head and the previous uncompacted head must
// still be available.
traverse(previousHead);
traverse(head);
// Create a tail state on top of the previous tail state. This increments the generation,
// but leaves the full generation untouched. This brings this generations two generations
// away from the latest full head state. Still, the full head state will not be deleted
// because doing so would generate an invalid repository at risk of SegmentNotFoundException.
fileStore.tailGC();
previousHead = head;
head = fileStore.getHead();
// retainedGeneration = 2 -> the tail compacted head and the previous uncompacted head must
// still be available.
traverse(previousHead);
traverse(head);
// Create a full, self consistent head state replacing the current tail of tail
// compacted heads.
fileStore.fullGC();
previousHead = head;
head = fileStore.getHead();
// retainedGeneration = 2 -> the full compacted head and the previous uncompacted head must
// still be available.
traverse(previousHead);
traverse(head);
}
}
}