blob: f9bd42b11dfe9e88067cca6741471566654d9c4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.stats.Clock;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.slf4j.helpers.MessageFormatter.arrayFormat;
public class VersionGCTest {
@Rule
public final DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
private ExecutorService execService;
private TestStore store = new TestStore();
private DocumentNodeStore ns;
private VersionGarbageCollector gc;
@Before
public void setUp() throws Exception {
execService = Executors.newCachedThreadPool();
Clock clock = new Clock.Virtual();
clock.waitUntil(System.currentTimeMillis());
Revision.setClock(clock);
ns = builderProvider.newBuilder()
.clock(clock)
.setLeaseCheckMode(LeaseCheckMode.DISABLED)
.setDocumentStore(store)
.setAsyncDelay(0)
.getNodeStore();
// create test content
createNode("foo");
removeNode("foo");
// wait one hour
clock.waitUntil(clock.getTime() + HOURS.toMillis(1));
gc = ns.getVersionGarbageCollector();
}
@After
public void tearDown() throws Exception {
execService.shutdown();
execService.awaitTermination(1, MINUTES);
}
@AfterClass
public static void resetClock() {
Revision.resetClockToDefault();
}
@Test
public void failParallelGC() throws Exception {
// block gc call
store.semaphore.acquireUninterruptibly();
Future<VersionGCStats> stats = gc();
boolean gcBlocked = false;
for (int i = 0; i < 10; i ++) {
if (store.semaphore.hasQueuedThreads()) {
gcBlocked = true;
break;
}
Thread.sleep(100);
}
assertTrue(gcBlocked);
// now try to trigger another GC
try {
gc.gc(30, TimeUnit.MINUTES);
fail("must throw an IOException");
} catch (IOException e) {
assertTrue(e.getMessage().contains("already running"));
} finally {
store.semaphore.release();
stats.get();
}
}
@Test
public void cancel() throws Exception {
// block gc call
store.semaphore.acquireUninterruptibly();
Future<VersionGCStats> stats = gc();
boolean gcBlocked = false;
for (int i = 0; i < 10; i ++) {
if (store.semaphore.hasQueuedThreads()) {
gcBlocked = true;
break;
}
Thread.sleep(100);
}
assertTrue(gcBlocked);
// now cancel the GC
gc.cancel();
store.semaphore.release();
assertTrue(stats.get().canceled);
}
@Test
public void cancelMustNotUpdateLastOldestTimeStamp() throws Exception {
// get previous entry from SETTINGS
String versionGCId = "versionGC";
String lastOldestTimeStampProp = "lastOldestTimeStamp";
Document statusBefore = store.find(Collection.SETTINGS, versionGCId);
// block gc call
store.semaphore.acquireUninterruptibly();
Future<VersionGCStats> stats = gc();
boolean gcBlocked = false;
for (int i = 0; i < 10; i ++) {
if (store.semaphore.hasQueuedThreads()) {
gcBlocked = true;
break;
}
Thread.sleep(100);
}
assertTrue(gcBlocked);
// now cancel the GC
gc.cancel();
store.semaphore.release();
assertTrue(stats.get().canceled);
// ensure a canceled GC doesn't update that versionGC SETTINGS entry
Document statusAfter = store.find(Collection.SETTINGS, "versionGC");
if (statusBefore == null) {
assertNull(statusAfter);
} else {
assertNotNull(statusAfter);
assertEquals(
"canceled GC shouldn't change the " + lastOldestTimeStampProp + " property on " + versionGCId
+ " settings entry",
statusBefore.get(lastOldestTimeStampProp), statusAfter.get(lastOldestTimeStampProp));
}
}
@Test
public void getInfo() throws Exception {
gc.gc(1, TimeUnit.HOURS);
gc.getInfo(1, TimeUnit.HOURS);
}
@Test
public void gcMonitorStatusUpdates() throws Exception {
TestGCMonitor monitor = new TestGCMonitor();
gc.setGCMonitor(monitor);
gc.gc(30, TimeUnit.MINUTES);
List<String> expected = Lists.newArrayList("INITIALIZING",
"COLLECTING", "CHECKING", "COLLECTING", "DELETING", "SORTING",
"DELETING", "UPDATING", "SPLITS_CLEANUP", "IDLE");
assertEquals(expected, monitor.getStatusMessages());
}
@Test
public void gcMonitorInfoMessages() throws Exception {
TestGCMonitor monitor = new TestGCMonitor();
gc.setGCMonitor(monitor);
gc.gc(2, TimeUnit.HOURS);
List<String> infoMessages = monitor.getInfoMessages();
assertEquals(3, infoMessages.size());
assertTrue(infoMessages.get(0).startsWith("Start "));
assertTrue(infoMessages.get(1).startsWith("Looking at revisions"));
assertTrue(infoMessages.get(2).startsWith("Revision garbage collection finished"));
}
@Test
public void findVersionGC() throws Exception {
store.findVersionGC.set(0);
gc.gc(1, TimeUnit.HOURS);
// must only read once
assertEquals(1, store.findVersionGC.get());
}
@Test
public void recommendationsOnHugeBacklog() throws Exception {
VersionGCOptions options = gc.getOptions();
final long oneYearAgo = ns.getClock().getTime() - TimeUnit.DAYS.toMillis(365);
final long twelveTimesTheLimit = options.collectLimit * 12;
final long secondsPerDay = TimeUnit.DAYS.toMillis(1);
VersionGCSupport localgcsupport = fakeVersionGCSupport(ns.getDocumentStore(), oneYearAgo, twelveTimesTheLimit);
VersionGCRecommendations rec = new VersionGCRecommendations(secondsPerDay, ns.getCheckpoints(), ns.getClock(), localgcsupport,
options, new TestGCMonitor());
// should select a duration of roughly one month
long duration= rec.scope.getDurationMs();
assertTrue(duration <= TimeUnit.DAYS.toMillis(33));
assertTrue(duration >= TimeUnit.DAYS.toMillis(28));
VersionGCStats stats = new VersionGCStats();
stats.limitExceeded = true;
rec.evaluate(stats);
assertTrue(stats.needRepeat);
rec = new VersionGCRecommendations(secondsPerDay, ns.getCheckpoints(), ns.getClock(), localgcsupport, options,
new TestGCMonitor());
// new duration should be half
long nduration = rec.scope.getDurationMs();
assertTrue(nduration == duration / 2);
}
// OAK-8448: test that after shrinking the scope to the minimum and after
// successful runs, scope will be expanded again
@Test
public void expandIntervalAgain() throws Exception {
VersionGCOptions options = gc.getOptions();
VersionGCRecommendations rec;
VersionGCStats stats;
VersionGCSupport localgcsupport;
GCMonitor testmonitor = new TestGCMonitor();
int days = 365;
long secondsPerDay = TimeUnit.DAYS.toMillis(1);
long oldestDeleted = ns.getClock().getTime() - TimeUnit.DAYS.toMillis(days);
// one per second
long deletedCount = TimeUnit.DAYS.toSeconds(days);
localgcsupport = fakeVersionGCSupport(ns.getDocumentStore(), oldestDeleted, deletedCount);
// loop until the recommended interval is at 60s (precisionMS)
do {
rec = new VersionGCRecommendations(secondsPerDay, ns.getCheckpoints(), ns.getClock(), localgcsupport, options,
testmonitor);
stats = new VersionGCStats();
stats.limitExceeded = true;
rec.evaluate(stats);
assertTrue(stats.needRepeat);
} while (rec.suggestedIntervalMs > options.precisionMs);
// loop with successful runs (1 node/sec interval deleted) and observe the interval
int iterations = 0;
int maxiterations = 1000;
do {
iterations += 1;
oldestDeleted = rec.scope.fromMs + rec.scope.getDurationMs();
int deleted = (int) (rec.scope.getDurationMs() / TimeUnit.SECONDS.toMillis(1));
deletedCount -= deleted;
localgcsupport = fakeVersionGCSupport(ns.getDocumentStore(), oldestDeleted, deletedCount);
rec = new VersionGCRecommendations(secondsPerDay, ns.getCheckpoints(), ns.getClock(), localgcsupport, options,
testmonitor);
stats = new VersionGCStats();
stats.limitExceeded = false;
stats.deletedDocGCCount = deleted;
stats.deletedLeafDocGCCount = 0;
rec.evaluate(stats);
} while (stats.needRepeat && iterations < maxiterations);
assertTrue("VersionGC should have finished after " + maxiterations + " iterations, but did not. Last scope was: "
+ rec.scope + ".", !stats.needRepeat);
}
// OAK-7378
@Test
public void recommendedInterval() throws Exception {
AtomicLong deletedOnceCountCalls = new AtomicLong();
// override the gc with a custom VersionGCSupport
gc = new VersionGarbageCollector(ns, new VersionGCSupport(store) {
@Override
public long getDeletedOnceCount() {
deletedOnceCountCalls.incrementAndGet();
return Iterables.size(Utils.getSelectedDocuments(store, NodeDocument.DELETED_ONCE, 1));
}
});
// run first RGC
gc.gc(1, TimeUnit.HOURS);
// afterwards there should be no more calls to getDeletedOnceCount()
deletedOnceCountCalls.set(0);
// try a couple of runs every five seconds to simulate continuous RGC
for (int i = 0; i < 10; i++) {
advanceClock(5, SECONDS);
gc.gc(1, TimeUnit.HOURS);
assertEquals(0, deletedOnceCountCalls.get());
}
}
private Future<VersionGCStats> gc() {
// run gc in a separate thread
return execService.submit(new Callable<VersionGCStats>() {
@Override
public VersionGCStats call() throws Exception {
return gc.gc(30, TimeUnit.MINUTES);
}
});
}
private void removeNode(String name) throws CommitFailedException {
NodeBuilder builder = ns.getRoot().builder();
builder.child(name).remove();
merge(ns, builder);
}
private void createNode(String name) throws CommitFailedException {
NodeBuilder builder = ns.getRoot().builder();
builder.child(name);
merge(ns, builder);
}
private void merge(DocumentNodeStore store, NodeBuilder builder)
throws CommitFailedException {
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
private void advanceClock(long time, TimeUnit unit)
throws InterruptedException {
Clock c = ns.getClock();
c.waitUntil(c.getTime() + unit.toMillis(time));
}
private class TestStore extends MemoryDocumentStore {
Semaphore semaphore = new Semaphore(1);
AtomicLong findVersionGC = new AtomicLong();
@NotNull
@Override
public <T extends Document> List<T> query(Collection<T> collection,
String fromKey,
String toKey,
String indexedProperty,
long startValue,
int limit) {
semaphore.acquireUninterruptibly();
try {
return super.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
} finally {
semaphore.release();
}
}
@Override
public <T extends Document> T find(Collection<T> collection,
String key) {
if (collection == Collection.SETTINGS
&& key.equals("versionGC")) {
findVersionGC.incrementAndGet();
}
return super.find(collection, key);
}
}
private class TestGCMonitor implements GCMonitor {
final List<String> infoMessages = Lists.newArrayList();
final List<String> statusMessages = Lists.newArrayList();
@Override
public void info(String message, Object... arguments) {
this.infoMessages.add(arrayFormat(message, arguments).getMessage());
}
@Override
public void warn(String message, Object... arguments) {
}
@Override
public void error(String message, Exception exception) {
}
@Override
public void skipped(String reason, Object... arguments) {
}
@Override
public void compacted() {
}
@Override
public void cleaned(long reclaimedSize, long currentSize) {
}
@Override
public void updateStatus(String status) {
this.statusMessages.add(status);
}
public List<String> getInfoMessages() {
return this.infoMessages;
}
public List<String> getStatusMessages() {
return this.statusMessages;
}
}
private VersionGCSupport fakeVersionGCSupport(final DocumentStore ds, final long oldestDeleted, final long countDeleted) {
return new VersionGCSupport(ds) {
@Override
public long getOldestDeletedOnceTimestamp(Clock clock, long precisionMs) {
return oldestDeleted;
}
@Override
public long getDeletedOnceCount() {
return countDeleted;
}
};
}
}