blob: fb08250cee63f7ad6e014db451d045a37492a82e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static com.google.common.collect.Iterables.size;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS_RESOLUTION;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.NUM_REVS_THRESHOLD;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.PREV_SPLIT_FACTOR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Runs a DocumentMK revision GC interleaved with a document split.
* Test for OAK-1791.
*/
@RunWith(Parameterized.class)
public class VersionGCWithSplitTest {
private DocumentStoreFixture fixture;
private Clock clock;
private Map<Thread, Semaphore> updateLocks = Maps.newIdentityHashMap();
private DocumentNodeStore store;
private VersionGarbageCollector gc;
public VersionGCWithSplitTest(DocumentStoreFixture fixture) {
this.fixture = fixture;
}
@Parameterized.Parameters(name = "{0}")
public static java.util.Collection<Object[]> fixtures() throws IOException {
List<Object[]> fixtures = Lists.newArrayList();
fixtures.add(new Object[] {new DocumentStoreFixture.MemoryFixture()});
DocumentStoreFixture mongo = new DocumentStoreFixture.MongoFixture();
if(mongo.isAvailable()){
fixtures.add(new Object[] {mongo});
}
return fixtures;
}
@Before
public void setUp() throws InterruptedException {
final DocumentStore docStore = fixture.createDocumentStore();
DocumentStore testStore = new TestStore(docStore);
clock = new Clock.Virtual();
store = new DocumentMK.Builder()
.clock(clock)
.setLeaseCheckMode(LeaseCheckMode.LENIENT)
.setDocumentStore(testStore)
.setAsyncDelay(0)
.getNodeStore();
gc = store.getVersionGarbageCollector();
//Baseline the clock
clock.waitUntil(Revision.getCurrentTimestamp());
}
@After
public void tearDown() throws Exception {
store.dispose();
fixture.dispose();
Revision.resetClockToDefault();
}
@Test
public void gcWithConcurrentSplit() throws Exception {
Revision.setClock(clock);
NodeBuilder builder = store.getRoot().builder();
builder.child("test").setProperty("prop", -1);
merge(store, builder);
final String id = Utils.getIdFromPath("/test");
DocumentStore docStore = store.getDocumentStore();
int count = 0;
while (docStore.find(NODES, id).getPreviousRanges().size() < PREV_SPLIT_FACTOR) {
builder = store.getRoot().builder();
builder.child("test").setProperty("prop", count);
merge(store, builder);
if (count++ % NUM_REVS_THRESHOLD == 0) {
store.runBackgroundOperations();
}
}
// wait one hour
clock.waitUntil(Revision.getCurrentTimestamp() +
HOURS.toMillis(1) +
2 * SECONDS.toMillis(MODIFIED_IN_SECS_RESOLUTION));
final AtomicReference<VersionGarbageCollector.VersionGCStats> stats = new AtomicReference<VersionGarbageCollector.VersionGCStats>();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
stats.set(gc.gc(1, HOURS));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
// block gc thread when it attempts to write
Semaphore gcLock = new Semaphore(0);
updateLocks.put(t, gcLock);
t.start();
while (!gcLock.hasQueuedThreads()) {
Thread.sleep(1);
}
// perform more changes until intermediate docs are created
while (docStore.find(NODES, id).getPreviousRanges().size() >= PREV_SPLIT_FACTOR) {
builder = store.getRoot().builder();
builder.child("test").setProperty("prop", count);
merge(store, builder);
if (count++ % NUM_REVS_THRESHOLD == 0) {
store.runBackgroundOperations();
}
}
// let gc thread continue
gcLock.release();
// the first split doc disconnect will be based on the main document
// with 10 previous entries (before the intermediate doc was created).
// the next 9 disconnect calls will see the updated main document
// pointing to the intermediate doc. those 9 split docs will be
// disconnected from there.
t.join();
assertEquals(10, stats.get().splitDocGCCount);
NodeDocument doc = docStore.find(NODES, id);
// there must only be one stale entry because 9 other split docs were
// disconnected from the new intermediate split doc
assertEquals(1, doc.getStalePrev().size());
store.addSplitCandidate(id);
store.runBackgroundOperations();
// now there must not be any stale prev entries
doc = docStore.find(NODES, id);
assertTrue(doc.getStalePrev().isEmpty());
Map<Revision, String> valueMap = doc.getValueMap("prop");
// there must be 101 revisions left. one in the main document and one
// split document with 100 revisions created after the GC was triggered
assertEquals(NUM_REVS_THRESHOLD + 1, valueMap.size());
// also count them individually
assertEquals(NUM_REVS_THRESHOLD + 1, size(valueMap.entrySet()));
}
private void merge(DocumentNodeStore store, NodeBuilder builder)
throws CommitFailedException {
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
private final class TestStore extends TimingDocumentStoreWrapper {
private final DocumentStore docStore;
public TestStore(DocumentStore base) {
super(base);
this.docStore = base;
}
@Override
public <T extends Document> T createOrUpdate(final Collection<T> collection,
final UpdateOp update) {
final AtomicReference<T> ref = new AtomicReference<T>();
runLocked(new Runnable() {
public void run() {
ref.set(docStore.createOrUpdate(collection, update));
}
});
return ref.get();
}
@Override
public <T extends Document> T findAndUpdate(final Collection<T> collection,
final UpdateOp update) {
final AtomicReference<T> ref = new AtomicReference<T>();
runLocked(new Runnable() {
public void run() {
ref.set(docStore.findAndUpdate(collection, update));
}
});
return ref.get();
}
private void runLocked(Runnable run) {
Thread t = Thread.currentThread();
Semaphore s = updateLocks.get(t);
if (s != null) {
s.acquireUninterruptibly();
}
try {
run.run();
} finally {
if (s != null) {
s.release();
}
}
}
}
}