blob: 103a6ec4a115ff70f5a9e5c86b66312596a632f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.commons.junit.LogDumper;
import org.apache.jackrabbit.oak.commons.junit.LogLevelModifier;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
import org.jetbrains.annotations.NotNull;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import static java.util.Collections.synchronizedList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class JournalTest extends AbstractJournalTest {
private MemoryDocumentStore ds;
private MemoryBlobStore bs;
@Rule
public TestRule logDumper = new LogDumper();
@Rule
public TestRule logLevelModifier = new LogLevelModifier()
.addAppenderFilter("file", "warn")
.setLoggerLevel("org.apache.jackrabbit.oak", "trace");
class DiffingObserver implements Observer, Runnable, NodeStateDiff {
final List<DocumentNodeState> incomingRootStates1 = Lists.newArrayList();
final List<DocumentNodeState> diffedRootStates1 = Lists.newArrayList();
DocumentNodeState oldRoot = null;
DiffingObserver(boolean startInBackground) {
if (startInBackground) {
// start the diffing in the background - so as to not
// interfere with the contentChanged call
Thread th = new Thread(this);
th.setDaemon(true);
th.start();
}
}
public void clear() {
synchronized(incomingRootStates1) {
incomingRootStates1.clear();
diffedRootStates1.clear();
}
}
@Override
public void contentChanged(@NotNull NodeState root,@NotNull CommitInfo info) {
synchronized(incomingRootStates1) {
incomingRootStates1.add((DocumentNodeState) root);
incomingRootStates1.notifyAll();
}
}
public void processAll() {
while(processOne()) {
// continue
}
}
public boolean processOne() {
DocumentNodeState newRoot;
synchronized(incomingRootStates1) {
if (incomingRootStates1.size()==0) {
return false;
}
newRoot = incomingRootStates1.remove(0);
}
if (oldRoot!=null) {
newRoot.compareAgainstBaseState(oldRoot, this);
}
oldRoot = newRoot;
synchronized(incomingRootStates1) {
diffedRootStates1.add(newRoot);
}
return true;
}
@Override
public void run() {
while(true) {
DocumentNodeState newRoot;
synchronized(incomingRootStates1) {
while(incomingRootStates1.size()==0) {
try {
incomingRootStates1.wait();
} catch (InterruptedException e) {
// ignore
}
}
newRoot = incomingRootStates1.remove(0);
}
if (oldRoot!=null) {
newRoot.compareAgainstBaseState(oldRoot, this);
}
oldRoot = newRoot;
synchronized(incomingRootStates1) {
diffedRootStates1.add(newRoot);
}
}
}
@Override
public boolean propertyAdded(PropertyState after) {
return true;
}
@Override
public boolean propertyChanged(PropertyState before, PropertyState after) {
return true;
}
@Override
public boolean propertyDeleted(PropertyState before) {
return true;
}
@Override
public boolean childNodeAdded(String name, NodeState after) {
return true;
}
@Override
public boolean childNodeChanged(String name, NodeState before,
NodeState after) {
return true;
}
@Override
public boolean childNodeDeleted(String name, NodeState before) {
return true;
}
public int getTotal() {
synchronized(incomingRootStates1) {
return incomingRootStates1.size() + diffedRootStates1.size();
}
}
}
@Test
public void cleanupTest() throws Exception {
DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0);
DocumentNodeStore ns1 = mk1.getNodeStore();
// make sure we're visible and marked as active
ns1.renewClusterIdLease();
// first clean up
Thread.sleep(100); // OAK-2979 : wait 100ms before doing the cleanup
jGC(ns1, 1, TimeUnit.MILLISECONDS);
Thread.sleep(100); // sleep just quickly
assertEquals(0, jGC(ns1, 1, TimeUnit.DAYS));
assertEquals(0, jGC(ns1, 6, TimeUnit.HOURS));
assertEquals(0, jGC(ns1, 1, TimeUnit.HOURS));
assertEquals(0, jGC(ns1, 10, TimeUnit.MINUTES));
assertEquals(0, jGC(ns1, 1, TimeUnit.MINUTES));
assertEquals(0, jGC(ns1, 1, TimeUnit.SECONDS));
assertEquals(0, jGC(ns1, 1, TimeUnit.MILLISECONDS));
// create some entries that can be deleted thereupon
mk1.commit("/", "+\"regular1\": {}", null, null);
mk1.commit("/", "+\"regular2\": {}", null, null);
mk1.commit("/", "+\"regular3\": {}", null, null);
mk1.commit("/regular2", "+\"regular4\": {}", null, null);
Thread.sleep(100); // sleep 100millis
assertEquals(0, jGC(ns1, 5, TimeUnit.SECONDS));
assertEquals(0, jGC(ns1, 1, TimeUnit.MILLISECONDS));
ns1.runBackgroundOperations();
mk1.commit("/", "+\"regular5\": {}", null, null);
ns1.runBackgroundOperations();
mk1.commit("/", "+\"regular6\": {}", null, null);
ns1.runBackgroundOperations();
Thread.sleep(100); // sleep 100millis
assertEquals(0, jGC(ns1, 5, TimeUnit.SECONDS));
assertEquals(3, jGC(ns1, 1, TimeUnit.MILLISECONDS));
}
@Test
public void journalTest() throws Exception {
DocumentMK mk1 = createMK(1, 0);
DocumentNodeStore ns1 = mk1.getNodeStore();
CountingDocumentStore countingDocStore1 = builder.actualStore;
CountingDiffCache countingDiffCache1 = builder.actualDiffCache;
DocumentMK mk2 = createMK(2, 0);
DocumentNodeStore ns2 = mk2.getNodeStore();
CountingDocumentStore countingDocStore2 = builder.actualStore;
CountingDiffCache countingDiffCache2 = builder.actualDiffCache;
final DiffingObserver observer = new DiffingObserver(false);
ns1.addObserver(observer);
ns1.runBackgroundOperations();
ns2.runBackgroundOperations();
observer.processAll(); // to make sure we have an 'oldRoot'
observer.clear();
countingDocStore1.resetCounters();
countingDocStore2.resetCounters();
// countingDocStore1.printStacks = true;
countingDiffCache1.resetLoadCounter();
countingDiffCache2.resetLoadCounter();
mk2.commit("/", "+\"regular1\": {}", null, null);
mk2.commit("/", "+\"regular2\": {}", null, null);
mk2.commit("/", "+\"regular3\": {}", null, null);
mk2.commit("/regular2", "+\"regular4\": {}", null, null);
// flush to journal
ns2.runBackgroundOperations();
// nothing notified yet
assertEquals(0, observer.getTotal());
assertEquals(0, countingDocStore1.getNumFindCalls(Collection.NODES));
assertEquals(0, countingDocStore1.getNumQueryCalls(Collection.NODES));
assertEquals(0, countingDocStore1.getNumRemoveCalls(Collection.NODES));
assertEquals(0, countingDocStore1.getNumCreateOrUpdateCalls(Collection.NODES));
assertEquals(0, countingDiffCache1.getLoadCount());
// let node 1 read those changes
// System.err.println("run background ops");
ns1.runBackgroundOperations();
mk2.commit("/", "+\"regular5\": {}", null, null);
ns2.runBackgroundOperations();
ns1.runBackgroundOperations();
// and let the observer process everything
observer.processAll();
countingDocStore1.printStacks = false;
// now expect 1 entry in rootStates
assertEquals(2, observer.getTotal());
assertEquals(0, countingDiffCache1.getLoadCount());
assertEquals(0, countingDocStore1.getNumRemoveCalls(Collection.NODES));
assertEquals(0, countingDocStore1.getNumCreateOrUpdateCalls(Collection.NODES));
assertEquals(0, countingDocStore1.getNumQueryCalls(Collection.NODES));
// assertEquals(0, countingDocStore1.getNumFindCalls(Collection.NODES));
}
@Test
public void externalBranchChange() throws Exception {
DocumentMK mk1 = createMK(1, 0);
DocumentNodeStore ns1 = mk1.getNodeStore();
DocumentMK mk2 = createMK(2, 0);
DocumentNodeStore ns2 = mk2.getNodeStore();
ns1.runBackgroundOperations();
ns2.runBackgroundOperations();
mk1.commit("/", "+\"regular1\": {}", null, null);
// flush to journal
ns1.runBackgroundOperations();
mk1.commit("/regular1", "+\"regular1child\": {}", null, null);
// flush to journal
ns1.runBackgroundOperations();
mk1.commit("/", "+\"regular2\": {}", null, null);
// flush to journal
ns1.runBackgroundOperations();
mk1.commit("/", "+\"regular3\": {}", null, null);
// flush to journal
ns1.runBackgroundOperations();
mk1.commit("/", "+\"regular4\": {}", null, null);
// flush to journal
ns1.runBackgroundOperations();
mk1.commit("/", "+\"regular5\": {}", null, null);
// flush to journal
ns1.runBackgroundOperations();
String b1 = mk1.branch(null);
b1 = mk1.commit("/", "+\"branchVisible\": {}", b1, null);
mk1.merge(b1, null);
// to flush the branch commit either dispose of mk1
// or run the background operations explicitly
// (as that will propagate the lastRev to the root)
ns1.runBackgroundOperations();
ns2.runBackgroundOperations();
String nodes = mk2.getNodes("/", null, 0, 0, 100, null);
assertEquals("{\"branchVisible\":{},\"regular1\":{},\"regular2\":{},\"regular3\":{},\"regular4\":{},\"regular5\":{},\":childNodeCount\":6}", nodes);
}
/** Inspired by LastRevRecoveryTest.testRecover() - simplified and extended with journal related asserts **/
@Test
public void lastRevRecoveryJournalTest() throws Exception {
doLastRevRecoveryJournalTest(false);
}
/** Inspired by LastRevRecoveryTest.testRecover() - simplified and extended with journal related asserts **/
@Test
public void lastRevRecoveryJournalTestWithConcurrency() throws Exception {
doLastRevRecoveryJournalTest(true);
}
private void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception {
DocumentMK mk1 = createMK(1, 0);
DocumentNodeStore ds1 = mk1.getNodeStore();
int c1Id = ds1.getClusterId();
DocumentMK mk2 = createMK(2, 0);
DocumentNodeStore ds2 = mk2.getNodeStore();
final int c2Id = ds2.getClusterId();
// should have none yet
assertEquals(0, countJournalEntries(ds1, 10));
assertEquals(0, countJournalEntries(ds2, 10));
//1. Create base structure /x/y
NodeBuilder b1 = ds1.getRoot().builder();
b1.child("x").child("y");
ds1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
ds1.runBackgroundOperations();
//lastRev are persisted directly for new nodes. In case of
// updates they are persisted via background jobs
//1.2 Get last rev populated for root node for ds2
ds2.runBackgroundOperations();
NodeBuilder b2 = ds2.getRoot().builder();
b2.child("x").setProperty("f1","b1");
ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
ds2.runBackgroundOperations();
//2. Add a new node /x/y/z
b2 = ds2.getRoot().builder();
b2.child("x").child("y").child("z").setProperty("foo", "bar");
ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
//Refresh DS1
ds1.runBackgroundOperations();
final NodeDocument z1 = getDocument(ds1, "/x/y/z");
NodeDocument y1 = getDocument(ds1, "/x/y");
final NodeDocument x1 = getDocument(ds1, "/x");
Revision zlastRev2 = z1.getLastRev().get(c2Id);
// /x/y/z is a new node and does not have a _lastRev
assertNull(zlastRev2);
Revision head2 = ds2.getHeadRevision().getRevision(ds2.getClusterId());
//lastRev should not be updated for C #2
assertNull(y1.getLastRev().get(c2Id));
final LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds, ds1);
// now 1 also has
final String change1 = "{\"x\":{\"y\":{}}}";
assertJournalEntries(ds1, change1);
final String change2 = "{\"x\":{}}";
assertJournalEntries(ds2, change2);
String change2b = "{\"x\":{\"y\":{\"z\":{}}}}";
if (!testConcurrency) {
//Do not pass y1 but still y1 should be updated
recovery.recover(Lists.newArrayList(x1,z1), c2Id);
//Post recovery the lastRev should be updated for /x/y and /x
assertEquals(head2, getDocument(ds1, "/x/y").getLastRev().get(c2Id));
assertEquals(head2, getDocument(ds1, "/x").getLastRev().get(c2Id));
assertEquals(head2, getDocument(ds1, "/").getLastRev().get(c2Id));
// now 1 is unchanged, but 2 was recovered now, so has one more:
assertJournalEntries(ds1, change1); // unchanged
assertJournalEntries(ds2, change2, change2b);
// just some no-ops:
recovery.recover(c2Id);
recovery.recover(Collections.<NodeDocument>emptyList(), c2Id);
assertJournalEntries(ds1, change1); // unchanged
assertJournalEntries(ds2, change2, change2b);
} else {
// do some concurrency testing as well to check if
final int NUM_THREADS = 200;
final CountDownLatch ready = new CountDownLatch(NUM_THREADS);
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(NUM_THREADS);
final List<Exception> exceptions = synchronizedList(new ArrayList<Exception>());
for (int i = 0; i < NUM_THREADS; i++) {
Thread th = new Thread(new Runnable() {
@Override
public void run() {
try {
ready.countDown();
start.await();
recovery.recover(Lists.newArrayList(x1,z1), c2Id);
} catch (DocumentStoreException e) {
if (e.getMessage().matches("Update of root document to _lastRev .* failed. Detected concurrent update")) {
// we have to accept this exception to happen
end.countDown();
} else {
exceptions.add(e);
}
} catch (Exception e) {
exceptions.add(e);
} finally {
end.countDown();
}
}
});
th.start();
}
ready.await(5, TimeUnit.SECONDS);
start.countDown();
assertTrue(end.await(20, TimeUnit.SECONDS));
assertJournalEntries(ds1, change1); // unchanged
assertJournalEntries(ds2, change2, change2b);
for (Exception ex : exceptions) {
throw ex;
}
}
}
// OAK-3433
@Test
public void journalEntryKey() throws Exception {
DocumentNodeStore ns1 = createMK(1, 0).getNodeStore();
DocumentNodeStore ns2 = createMK(2, 0).getNodeStore();
NodeBuilder b1 = ns1.getRoot().builder();
b1.child("foo");
ns1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
ns1.runBackgroundOperations();
NodeBuilder b2 = ns2.getRoot().builder();
b2.child("bar");
ns2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
Revision h2 = ns2.getHeadRevision().getRevision(ns2.getClusterId());
assertNotNull(h2);
ns2.runBackgroundReadOperations();
ns2.runBackgroundOperations();
String id = JournalEntry.asId(h2);
assertTrue("Background update did not create a journal entry with id " + id,
ns1.getDocumentStore().find(Collection.JOURNAL, id) != null);
}
private int jGC(DocumentNodeStore ns, long maxRevisionAge, TimeUnit unit) {
return new JournalGarbageCollector(ns, unit.toMillis(maxRevisionAge)).gc();
}
private DocumentMK createMK(int clusterId, int asyncDelay) {
if (ds == null) {
ds = new MemoryDocumentStore();
}
if (bs == null) {
bs = new MemoryBlobStore();
}
return createMK(clusterId, asyncDelay, ds, bs);
}
}