blob: 46cb508aec3d33059f8e12c0e8f97e179c2b8228 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import ch.qos.logback.classic.Level;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.mongodb.BasicDBObject;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoCollection;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.commons.FileIOUtils;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils;
import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.stats.Clock;
import org.jetbrains.annotations.Nullable;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests for MongoMK GC
*/
public class MongoBlobGCTest extends AbstractMongoConnectionTest {
private Clock clock;
private static final Logger log = LoggerFactory.getLogger(MongoBlobGCTest.class);
@Rule
public TemporaryFolder folder = new TemporaryFolder(new File("target"));
public DataStoreState setUp(boolean deleteDirect) throws Exception {
return setUp(deleteDirect, 10);
}
@Override
protected DocumentMK.Builder addToBuilder(DocumentMK.Builder mk) {
// Disable client session because this test modifies
// data directly in MongoDB.
return super.addToBuilder(mk)
.setClientSessionDisabled(true)
.setLeaseCheckMode(LeaseCheckMode.DISABLED);
}
public DataStoreState setUp(boolean deleteDirect, int count) throws Exception {
DocumentNodeStore s = mk.getNodeStore();
// ensure primary read preference for this test because we modify data
// directly in MongoDB without going through the MongoDocumentStore
MongoTestUtils.setReadPreference(s, ReadPreference.primary());
NodeBuilder a = s.getRoot().builder();
int number = count;
int maxDeleted = 5;
// track the number of the assets to be deleted
List<Integer> processed = Lists.newArrayList();
Random rand = new Random(47);
for (int i = 0; i < maxDeleted; i++) {
int n = rand.nextInt(number);
if (!processed.contains(n)) {
processed.add(n);
}
}
DataStoreState state = new DataStoreState();
for (int i = 0; i < number; i++) {
Blob b = s.createBlob(randomStream(i, 16516));
Iterator<String> idIter =
((GarbageCollectableBlobStore) s.getBlobStore())
.resolveChunks(b.toString());
while (idIter.hasNext()) {
String chunk = idIter.next();
state.blobsAdded.add(chunk);
if (!processed.contains(i)) {
state.blobsPresent.add(chunk);
}
}
a.child("c" + i).setProperty("x", b);
// Add a duplicated entry
if (i == 0) {
a.child("cdup").setProperty("x", b);
}
}
s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
if (deleteDirect) {
for (int id : processed) {
deleteFromMongo("c" + id);
}
} else {
a = s.getRoot().builder();
for (int id : processed) {
a.child("c" + id).remove();
s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
long maxAge = 10; // hours
// 1. Go past GC age and check no GC done as nothing deleted
clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(maxAge));
VersionGarbageCollector vGC = s.getVersionGarbageCollector();
VersionGCStats stats = vGC.gc(0, TimeUnit.MILLISECONDS);
assertEquals(processed.size(), stats.deletedDocGCCount);
}
return state;
}
private class DataStoreState {
Set<String> blobsAdded = Sets.newHashSet();
Set<String> blobsPresent = Sets.newHashSet();
}
private HashSet<String> addInlined() throws Exception {
HashSet<String> set = new HashSet<String>();
DocumentNodeStore s = mk.getNodeStore();
NodeBuilder a = s.getRoot().builder();
int number = 12;
for (int i = 0; i < number; i++) {
Blob b = s.createBlob(randomStream(i, 50));
a.child("cinline" + i).setProperty("x", b);
}
s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return set;
}
private HashSet<String> addNodeSpecialChars() throws Exception {
List<String> specialCharSets =
Lists.newArrayList("q\\%22afdg\\%22", "a\nbcd", "a\n\rabcd", "012\\efg" );
DocumentNodeStore ds = mk.getNodeStore();
HashSet<String> set = new HashSet<String>();
NodeBuilder a = ds.getRoot().builder();
for (int i = 0; i < specialCharSets.size(); i++) {
Blob b = ds.createBlob(randomStream(i, 18432));
NodeBuilder n = a.child("cspecial");
n.child(specialCharSets.get(i)).setProperty("x", b);
Iterator<String> idIter =
((GarbageCollectableBlobStore) ds.getBlobStore())
.resolveChunks(b.toString());
set.addAll(Lists.newArrayList(idIter));
}
ds.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return set;
}
private void deleteFromMongo(String nodeId) {
MongoCollection<BasicDBObject> coll = mongoConnection.getDatabase()
.getCollection("nodes", BasicDBObject.class);
BasicDBObject blobNodeObj = new BasicDBObject();
blobNodeObj.put("_id", "1:/" + nodeId);
coll.deleteOne(blobNodeObj);
}
@Test
public void gcDirectMongoDelete() throws Exception {
DataStoreState state = setUp(true);
Set<String> existingAfterGC = gc(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
@Test
public void checkMark() throws Exception {
String rootFolder = folder.newFolder().getAbsolutePath();
LogCustomizer customLogs = LogCustomizer
.forLogger(MarkSweepGarbageCollector.class.getName())
.enable(Level.TRACE)
.filter(Level.TRACE)
.create();
DataStoreState state = setUp(true, 10);
log.info("{} blobs available : {}", state.blobsPresent.size(), state.blobsPresent);
customLogs.starting();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(0, executor, rootFolder);
gcObj.collectGarbage(true);
customLogs.finished();
assertBlobReferences(state.blobsPresent, rootFolder);
}
@Test
public void gcSpecialChar() throws Exception {
DataStoreState state = setUp(true);
Set<String> specialCharNodeBlobs = addNodeSpecialChars();
state.blobsAdded.addAll(specialCharNodeBlobs);
state.blobsPresent.addAll(specialCharNodeBlobs);
Set<String> existingAfterGC = gc(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
@Test
public void noGc() throws Exception {
DataStoreState state = setUp(true);
Set<String> existingAfterGC = gc(86400);
assertTrue(Sets.symmetricDifference(state.blobsAdded, existingAfterGC).isEmpty());
}
@Test
public void gcVersionDelete() throws Exception {
DataStoreState state = setUp(false);
Set<String> existingAfterGC = gc(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
@Test
public void gcDirectMongoDeleteWithInlined() throws Exception {
DataStoreState state = setUp(true);
addInlined();
Set<String> existingAfterGC = gc(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
@Test
public void gcVersionDeleteWithInlined() throws Exception {
DataStoreState state = setUp(false);
addInlined();
Set<String> existingAfterGC = gc(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
@Test
public void consistencyCheckInlined() throws Exception {
DataStoreState state = setUp(true);
addInlined();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor);
long candidates = gcObj.checkConsistency();
assertEquals(1, executor.getTaskCount());
assertEquals(0, candidates);
}
@Test
public void consistencyCheckInit() throws Exception {
DataStoreState state = setUp(true);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor);
long candidates = gcObj.checkConsistency();
assertEquals(1, executor.getTaskCount());
assertEquals(0, candidates);
}
@Test
public void consistencyCheckWithGc() throws Exception {
DataStoreState state = setUp(true);
Set<String> existingAfterGC = gc(0);
assertTrue("blobsAdded: " + state.blobsAdded +
", blobsPresent: " + state.blobsPresent +
", existingAfterGC: " + existingAfterGC,
Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor);
long candidates = gcObj.checkConsistency();
assertEquals(1, executor.getTaskCount());
assertEquals(0, candidates);
}
@Test
public void consistencyCheckWithRenegadeDelete() throws Exception {
DataStoreState state = setUp(true);
// Simulate faulty state by deleting some blobs directly
Random rand = new Random(87);
List<String> existing = Lists.newArrayList(state.blobsPresent);
GarbageCollectableBlobStore store = (GarbageCollectableBlobStore)
mk.getNodeStore().getBlobStore();
long count = store.countDeleteChunks(ImmutableList.of(existing.get(rand.nextInt(existing.size()))), 0);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor);
long candidates = gcObj.checkConsistency();
assertEquals(1, executor.getTaskCount());
assertEquals(count, candidates);
}
// OAK-3390
@Test
public void referencedBlobs() throws Exception {
Iterator<ReferencedBlob> blobs = mk.getNodeStore().getReferencedBlobsIterator();
assertTrue(blobs instanceof MongoBlobReferenceIterator);
}
@Test
public void gcLongRunningBlobCollection() throws Exception {
DataStoreState state = setUp(true);
log.info("{} Blobs added {}", state.blobsAdded.size(), state.blobsAdded);
log.info("{} Blobs should be present {}", state.blobsPresent.size(), state.blobsPresent);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
DocumentNodeStore store = mk.getNodeStore();
String repoId = null;
if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
repoId = ClusterRepositoryInfo.getOrCreateId(store);
((SharedDataStore) store.getBlobStore()).setRepositoryId(repoId);
}
TestGarbageCollector gc =
new TestGarbageCollector(new DocumentBlobReferenceRetriever(store),
(GarbageCollectableBlobStore) store.getBlobStore(), executor,
folder.newFolder().getAbsolutePath(), 5, 5000, repoId);
gc.collectGarbage(false);
Set<String> existingAfterGC = iterate();
log.info("{} Blobs existing after gc {}", existingAfterGC.size(), existingAfterGC);
assertTrue(Sets.difference(state.blobsPresent, existingAfterGC).isEmpty());
assertEquals(gc.additionalBlobs, Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
}
@Test
public void checkGcPathLogging() throws Exception {
String rootFolder = folder.newFolder().getAbsolutePath();
LogCustomizer customLogs = LogCustomizer
.forLogger(MarkSweepGarbageCollector.class.getName())
.enable(Level.TRACE)
.filter(Level.TRACE)
.create();
setUp(false);
customLogs.starting();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(0, executor, rootFolder);
gcObj.collectGarbage(true);
customLogs.finished();
assertBlobReferenceRecords(1, rootFolder);
}
@Test
public void checkConsistencyPathLogging() throws Exception {
String rootFolder = folder.newFolder().getAbsolutePath();
LogCustomizer customLogs = LogCustomizer
.forLogger(MarkSweepGarbageCollector.class.getName())
.enable(Level.TRACE)
.filter(Level.TRACE)
.create();
setUp(false);
customLogs.starting();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gcObj = init(86400, executor, rootFolder);
gcObj.checkConsistency();
customLogs.finished();
assertBlobReferenceRecords(2, rootFolder);
}
private static void assertBlobReferences(Set<String> expected, String rootFolder) throws IOException {
InputStream is = null;
try {
is = new FileInputStream(getMarkedFile(rootFolder));
Set<String> records = FileIOUtils.readStringsAsSet(is, true);
assertEquals(expected, records);
} finally {
Closeables.close(is, false);
}
}
private static void assertBlobReferenceRecords(int expected, String rootFolder) throws IOException {
InputStream is = null;
try {
is = new FileInputStream(getMarkedFile(rootFolder));
Set<String> records = FileIOUtils.readStringsAsSet(is, true);
for (String rec : records) {
assertEquals(expected, Splitter.on(",").omitEmptyStrings().splitToList(rec).size());
}
} finally {
Closeables.close(is, false);
}
}
private static File getMarkedFile(String rootFolder) {
// Read the marked files to check if paths logged or not
File root = new File(rootFolder);
List<File> rootFile = FileFilterUtils.filterList(
FileFilterUtils.prefixFileFilter("gcworkdir-"),
root.listFiles());
List<File> markedFiles = FileFilterUtils.filterList(
FileFilterUtils.prefixFileFilter("marked-"),
rootFile.get(0).listFiles());
return markedFiles.get(0);
}
private Set<String> gc(int blobGcMaxAgeInSecs) throws Exception {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gc = init(blobGcMaxAgeInSecs, executor);
gc.collectGarbage(false);
assertEquals(0, executor.getTaskCount());
return iterate();
}
private MarkSweepGarbageCollector init(int blobGcMaxAgeInSecs, ThreadPoolExecutor executor) throws Exception {
return init(blobGcMaxAgeInSecs, executor, null);
}
private MarkSweepGarbageCollector init(int blobGcMaxAgeInSecs, ThreadPoolExecutor executor,
String root) throws Exception {
DocumentNodeStore store = mk.getNodeStore();
String repoId = null;
if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
repoId = ClusterRepositoryInfo.getOrCreateId(store);
((SharedDataStore) store.getBlobStore()).setRepositoryId(repoId);
}
if (Strings.isNullOrEmpty(root)) {
root = folder.newFolder().getAbsolutePath();
}
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new DocumentBlobReferenceRetriever(store),
(GarbageCollectableBlobStore) store.getBlobStore(), executor, root, 5, blobGcMaxAgeInSecs, repoId);
return gc;
}
protected Set<String> iterate() throws Exception {
GarbageCollectableBlobStore store = (GarbageCollectableBlobStore)
mk.getNodeStore().getBlobStore();
Iterator<String> cur = store.getAllChunkIds(0);
Set<String> existing = Sets.newHashSet();
while (cur.hasNext()) {
existing.add(cur.next());
}
return existing;
}
static InputStream randomStream(int seed, int size) {
Random r = new Random(seed);
byte[] data = new byte[size];
r.nextBytes(data);
return new ByteArrayInputStream(data);
}
@Override
protected Clock getTestClock() throws InterruptedException {
clock = new Clock.Virtual();
clock.waitUntil(Revision.getCurrentTimestamp());
return clock;
}
/**
* Waits for some time and adds additional blobs after blob referenced identified to simulate
* long running blob id collection phase.
*/
class TestGarbageCollector extends MarkSweepGarbageCollector {
long maxLastModifiedInterval;
String root;
GarbageCollectableBlobStore blobStore;
Set<String> additionalBlobs;
public TestGarbageCollector(BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore,
Executor executor, String root, int batchCount, long maxLastModifiedInterval,
@Nullable String repositoryId) throws IOException {
super(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId);
this.root = root;
this.blobStore = blobStore;
this.maxLastModifiedInterval = maxLastModifiedInterval;
this.additionalBlobs = Sets.newHashSet();
}
@Override
protected void markAndSweep(boolean markOnly, boolean forceBlobRetrieve) throws Exception {
boolean threw = true;
GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
try {
Stopwatch sw = Stopwatch.createStarted();
LOG.info("Starting Test Blob garbage collection");
// Sleep a little more than the max interval to get over the interval for valid blobs
Thread.sleep(maxLastModifiedInterval + 1000);
LOG.info("Slept {} to make blobs old", maxLastModifiedInterval + 1000);
long markStart = System.currentTimeMillis();
mark(fs);
LOG.info("Mark finished");
additionalBlobs = createAdditional();
if (!markOnly) {
Thread.sleep(maxLastModifiedInterval + 100);
LOG.info("Slept {} to make additional blobs old", maxLastModifiedInterval + 100);
long deleteCount = sweep(fs, markStart, forceBlobRetrieve);
threw = false;
LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
deleteCount, maxLastModifiedInterval);
}
} finally {
if (!LOG.isTraceEnabled()) {
Closeables.close(fs, threw);
}
}
}
public HashSet<String> createAdditional() throws Exception {
HashSet<String> blobSet = new HashSet<String>();
DocumentNodeStore s = mk.getNodeStore();
NodeBuilder a = s.getRoot().builder();
int number = 5;
for (int i = 0; i < number; i++) {
Blob b = s.createBlob(randomStream(100 + i, 16516));
a.child("cafter" + i).setProperty("x", b);
Iterator<String> idIter =
((GarbageCollectableBlobStore) s.getBlobStore())
.resolveChunks(b.toString());
while (idIter.hasNext()) {
String chunk = idIter.next();
blobSet.add(chunk);
}
}
log.info("{} Additional created {}", blobSet.size(), blobSet);
s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return blobSet;
}
}
}