blob: 9f45a13a9699baa1ae928b788a117dc17b232df9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.segment;
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.defaultGCOptions;
import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import ch.qos.logback.classic.Level;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.commons.FileIOUtils;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for SegmentNodeStore DataStore GC
*/
public class SegmentDataStoreBlobGCIT {
private static final Logger log = LoggerFactory.getLogger(SegmentDataStoreBlobGCIT.class);
private static InputStream randomStream(int seed, int size) {
Random r = new Random(seed);
byte[] data = new byte[size];
r.nextBytes(data);
return new ByteArrayInputStream(data);
}
private SegmentNodeStore nodeStore;
private FileStore store;
private DataStoreBlobStore blobStore;
private SegmentGCOptions gcOptions = defaultGCOptions();
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
@After
public void closeFileStore() throws Exception {
if (store != null) {
store.close();
}
}
@After
public void closeBlobStore() throws Exception {
if (blobStore != null) {
blobStore.close();
}
}
private SegmentNodeStore getNodeStore(BlobStore blobStore) throws Exception {
if (nodeStore == null) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
FileStoreBuilder builder = fileStoreBuilder(getWorkDir())
.withNodeDeduplicationCacheSize(16384)
.withBlobStore(blobStore)
.withMaxFileSize(256)
.withMemoryMapping(false)
.withStatisticsProvider(new DefaultStatisticsProvider(executor))
.withGCOptions(gcOptions);
store = builder.build();
nodeStore = SegmentNodeStoreBuilders.builder(store).build();
}
return nodeStore;
}
private File getWorkDir() {
return folder.getRoot();
}
public DataStoreState setUp() throws Exception {
return setUp(10);
}
protected DataStoreBlobStore getBlobStore(File folder) throws Exception {
return DataStoreUtils.getBlobStore(folder);
}
public DataStoreState setUp(int count) throws Exception {
if (blobStore == null) {
blobStore = getBlobStore(folder.newFolder());
}
nodeStore = getNodeStore(blobStore);
NodeBuilder a = nodeStore.getRoot().builder();
/* Create garbage by creating in-lined blobs (size < 16KB) */
int number = 500;
NodeBuilder content = a.child("content");
for (int i = 0; i < number; i++) {
NodeBuilder c = content.child("x" + i);
for (int j = 0; j < 5; j++) {
c.setProperty("p" + j, nodeStore.createBlob(randomStream(j, 16384)));
}
}
nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
final long dataSize = store.getStats().getApproximateSize();
log.info("File store dataSize {}", byteCountToDisplaySize(dataSize));
// 2. Now remove the nodes to generate garbage
content = a.child("content");
for (int i = 0; i < 100; i++) {
NodeBuilder c = content.child("x" + i);
for (int j = 0; j < 5; j++) {
c.removeProperty("p" + j);
}
}
nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
/* Create and delete nodes with blobs stored in DS*/
int maxDeleted = 5;
int numBlobs = count;
List<Integer> processed = Lists.newArrayList();
Random rand = new Random();
for (int i = 0; i < maxDeleted; i++) {
int n = rand.nextInt(numBlobs);
if (!processed.contains(n)) {
processed.add(n);
}
}
DataStoreState state = new DataStoreState();
for (int i = 0; i < numBlobs; i++) {
SegmentBlob b = (SegmentBlob) nodeStore.createBlob(randomStream(i, 18342));
Iterator<String> idIter = blobStore.resolveChunks(b.getBlobId());
while (idIter.hasNext()) {
String chunk = idIter.next();
state.blobsAdded.add(chunk);
if (!processed.contains(i)) {
state.blobsPresent.add(chunk);
}
}
a.child("c" + i).setProperty("x", b);
}
nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("Created blobs : {}", state.blobsAdded.size());
for (int id : processed) {
delete("c" + id);
}
log.info("Deleted nodes : {}", processed.size());
// Sleep a little to make eligible for cleanup
TimeUnit.MILLISECONDS.sleep(5);
// Ensure cleanup is efficient by surpassing the number of
// retained generations
for (int k = 0; k < gcOptions.getRetainedGenerations(); k++) {
store.compactFull();
}
store.cleanup();
return state;
}
private HashSet<String> addInlined() throws Exception {
HashSet<String> set = new HashSet<String>();
NodeBuilder a = nodeStore.getRoot().builder();
int number = 4;
for (int i = 0; i < number; i++) {
Blob b = nodeStore.createBlob(randomStream(i, 16514));
a.child("cinline" + i).setProperty("x", b);
}
nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return set;
}
private HashSet<String> addNodeSpecialChars() throws Exception {
List<String> specialCharSets =
Lists.newArrayList("q\\%22afdg\\%22", "a\nbcd", "a\n\rabcd", "012\\efg" );
HashSet<String> set = new HashSet<String>();
NodeBuilder a = nodeStore.getRoot().builder();
for (int i = 0; i < specialCharSets.size(); i++) {
SegmentBlob b = (SegmentBlob) nodeStore.createBlob(randomStream(i, 18432));
NodeBuilder n = a.child("cspecial");
n.child(specialCharSets.get(i)).setProperty("x", b);
Iterator<String> idIter = blobStore.resolveChunks(b.getBlobId());
set.addAll(Lists.newArrayList(idIter));
}
nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return set;
}
private class DataStoreState {
Set<String> blobsAdded = Sets.newHashSet();
Set<String> blobsPresent = Sets.newHashSet();
}
private void delete(String nodeId) throws CommitFailedException {
NodeBuilder builder = nodeStore.getRoot().builder();
builder.child(nodeId).remove();
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
@Test
public void gc() throws Exception {
DataStoreState state = setUp();
log.info("{} blobs that should remain after gc : {}", state.blobsPresent.size(), state.blobsPresent);
log.info("{} blobs for nodes which are deleted : {}", state.blobsPresent.size(), state.blobsPresent);
Set<String> existingAfterGC = gcInternal(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
@Test
public void checkMark() throws Exception {
LogCustomizer customLogs = LogCustomizer
.forLogger(MarkSweepGarbageCollector.class.getName())
.enable(Level.TRACE)
.filter(Level.TRACE)
.create();
DataStoreState state = setUp(10);
log.info("{} blobs available : {}", state.blobsPresent.size(), state.blobsPresent);
customLogs.starting();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
String rootFolder = folder.newFolder().getAbsolutePath();
MarkSweepGarbageCollector gcObj = init(0, executor, rootFolder);
gcObj.collectGarbage(true);
customLogs.finished();
assertBlobReferenceRecords(state.blobsPresent, rootFolder);
}
@Test
public void noGc() throws Exception {
DataStoreState state = setUp();
log.info("{} blobs that should remain after gc : {}", state.blobsAdded.size(), state.blobsAdded);
log.info("{} blobs for nodes which are deleted : {}", state.blobsPresent.size(), state.blobsPresent);
Set<String> existingAfterGC = gcInternal(86400);
assertTrue(Sets.symmetricDifference(state.blobsAdded, existingAfterGC).isEmpty());
}
@Test
public void gcSpecialChar() throws Exception {
DataStoreState state = setUp();
Set<String> specialCharNodeBlobs = addNodeSpecialChars();
state.blobsAdded.addAll(specialCharNodeBlobs);
state.blobsPresent.addAll(specialCharNodeBlobs);
Set<String> existingAfterGC = gcInternal(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
@Test
public void consistencyCheckInit() throws Exception {
DataStoreState state = setUp();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor);
long candidates = gcObj.checkConsistency();
assertEquals(1, executor.getTaskCount());
assertEquals(0, candidates);
}
@Test
public void consistencyCheckWithGc() throws Exception {
DataStoreState state = setUp();
Set<String> existingAfterGC = gcInternal(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor);
long candidates = gcObj.checkConsistency();
assertEquals(1, executor.getTaskCount());
assertEquals(0, candidates);
}
@Test
public void consistencyCheckWithRenegadeDelete() throws Exception {
DataStoreState state = setUp();
// Simulate faulty state by deleting some blobs directly
Random rand = new Random(87);
List<String> existing = Lists.newArrayList(state.blobsPresent);
long count = blobStore.countDeleteChunks(ImmutableList.of(existing.get(rand.nextInt(existing.size()))), 0);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor);
long candidates = gcObj.checkConsistency();
assertEquals(1, executor.getTaskCount());
assertEquals(count, candidates);
}
@Test
public void gcLongRunningBlobCollection() throws Exception {
DataStoreState state = setUp();
log.info("{} Blobs added {}", state.blobsAdded.size(), state.blobsAdded);
log.info("{} Blobs should be present {}", state.blobsPresent.size(), state.blobsPresent);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
String repoId = null;
if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
repoId = ClusterRepositoryInfo.getOrCreateId(nodeStore);
((SharedDataStore) store.getBlobStore()).addMetadataRecord(
new ByteArrayInputStream(new byte[0]),
REPOSITORY.getNameFromId(repoId));
}
TestGarbageCollector gc = new TestGarbageCollector(
new SegmentBlobReferenceRetriever(store),
(GarbageCollectableBlobStore) store.getBlobStore(), executor, folder.newFolder().getAbsolutePath(), 5, 5000, repoId);
gc.collectGarbage(false);
Set<String> existingAfterGC = iterate();
log.info("{} Blobs existing after gc {}", existingAfterGC.size(), existingAfterGC);
assertTrue(Sets.difference(state.blobsPresent, existingAfterGC).isEmpty());
assertEquals(gc.additionalBlobs, Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
}
@Test
public void gcWithInlined() throws Exception {
blobStore = new DataStoreBlobStore(DataStoreUtils.createFDS(new File(getWorkDir(), "datastore"), 16516));
DataStoreState state = setUp();
addInlined();
log.info("{} blobs that should remain after gc : {}", state.blobsAdded.size(), state.blobsAdded);
log.info("{} blobs for nodes which are deleted : {}", state.blobsPresent.size(), state.blobsPresent);
Set<String> existingAfterGC = gcInternal(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
@Test
public void consistencyCheckInlined() throws Exception {
blobStore = new DataStoreBlobStore(DataStoreUtils.createFDS(new File(getWorkDir(), "datastore"), 16516));
DataStoreState state = setUp();
addInlined();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor);
long candidates = gcObj.checkConsistency();
assertEquals(1, executor.getTaskCount());
assertEquals(0, candidates);
}
private Set<String> gcInternal(long maxBlobGcInSecs) throws Exception {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gc = init(maxBlobGcInSecs, executor);
gc.collectGarbage(false);
assertEquals(0, executor.getTaskCount());
Set<String> existingAfterGC = iterate();
log.info("{} blobs existing after gc : {}", existingAfterGC.size(), existingAfterGC);
return existingAfterGC;
}
private static void assertBlobReferenceRecords(Set<String> expected, String rootFolder) throws IOException {
// Read the marked files to check if paths logged or not
File root = new File(rootFolder);
List<File> rootFile = FileFilterUtils.filterList(
FileFilterUtils.prefixFileFilter("gcworkdir-"),
root.listFiles());
List<File> markedFiles = FileFilterUtils.filterList(
FileFilterUtils.prefixFileFilter("marked-"),
rootFile.get(0).listFiles());
try (InputStream is = new FileInputStream(markedFiles.get(0))) {
Set<String> records = FileIOUtils.readStringsAsSet(is, true);
assertEquals(expected.size(), records.size());
assertEquals(expected, records);
} finally {
FileUtils.forceDelete(rootFile.get(0));
}
}
private MarkSweepGarbageCollector init(long blobGcMaxAgeInSecs, ThreadPoolExecutor executor)
throws Exception {
return init(blobGcMaxAgeInSecs, executor, folder.newFolder().getAbsolutePath());
}
private MarkSweepGarbageCollector init(long blobGcMaxAgeInSecs, ThreadPoolExecutor executor,
String root) throws Exception {
String repoId = null;
if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
repoId = ClusterRepositoryInfo.getOrCreateId(nodeStore);
((SharedDataStore) store.getBlobStore()).addMetadataRecord(
new ByteArrayInputStream(new byte[0]),
REPOSITORY.getNameFromId(repoId));
}
MarkSweepGarbageCollector gc =
new MarkSweepGarbageCollector(new SegmentBlobReferenceRetriever(store),
(GarbageCollectableBlobStore) store.getBlobStore(), executor,
root, 2048, blobGcMaxAgeInSecs, repoId);
return gc;
}
private Set<String> iterate() throws Exception {
Iterator<String> cur = blobStore.getAllChunkIds(0);
Set<String> existing = Sets.newHashSet();
while (cur.hasNext()) {
existing.add(cur.next());
}
return existing;
}
/**
* Waits for some time and adds additional blobs after blob referenced identified to simulate
* long running blob id collection phase.
*/
private class TestGarbageCollector extends MarkSweepGarbageCollector {
private long maxLastModifiedInterval;
private String root;
private GarbageCollectableBlobStore blobStore;
private Set<String> additionalBlobs;
TestGarbageCollector(BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore,
Executor executor, String root, int batchCount, long maxLastModifiedInterval,
@Nullable String repositoryId) throws IOException {
super(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId);
this.root = root;
this.blobStore = blobStore;
this.maxLastModifiedInterval = maxLastModifiedInterval;
this.additionalBlobs = Sets.newHashSet();
}
@Override
protected void markAndSweep(boolean markOnly, boolean forceBlobRetrieve) throws Exception {
try (GarbageCollectorFileState fs = new GarbageCollectorFileState(root)) {
Stopwatch sw = Stopwatch.createStarted();
LOG.info("Starting Test Blob garbage collection");
// Sleep a little more than the max interval to get over the interval for valid blobs
Thread.sleep(maxLastModifiedInterval + 100);
LOG.info("Slept {} to make blobs old", maxLastModifiedInterval + 100);
long markStart = System.currentTimeMillis();
mark(fs);
LOG.info("Mark finished");
additionalBlobs = createAdditional();
if (!markOnly) {
Thread.sleep(maxLastModifiedInterval + 100);
LOG.info("Slept {} to make additional blobs old", maxLastModifiedInterval + 100);
long deleteCount = sweep(fs, markStart, forceBlobRetrieve);
LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
deleteCount, maxLastModifiedInterval);
}
}
}
private HashSet<String> createAdditional() throws Exception {
HashSet<String> blobSet = new HashSet<String>();
NodeBuilder a = nodeStore.getRoot().builder();
int number = 5;
for (int i = 0; i < number; i++) {
SegmentBlob b = (SegmentBlob) nodeStore.createBlob(randomStream(100 + i, 16516));
a.child("cafter" + i).setProperty("x", b);
Iterator<String> idIter =
((GarbageCollectableBlobStore) blobStore).resolveChunks(b.getBlobId());
while (idIter.hasNext()) {
String chunk = idIter.next();
blobSet.add(chunk);
}
}
log.info("{} Additional created {}", blobSet.size(), blobSet);
nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return blobSet;
}
}
}