blob: 9efd6681cbeda642e51561e9732abe610aac9fc1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.stats.Clock;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
import static org.apache.jackrabbit.oak.plugins.document.Path.ROOT;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class CollisionTest {
@Rule
public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
private static final AtomicInteger COUNTER = new AtomicInteger();
private Clock clock;
private List<Object> branches = new ArrayList<>();
@Before
public void before() throws Exception {
clock = new Clock.Virtual();
clock.waitUntil(System.currentTimeMillis());
Revision.setClock(clock);
ClusterNodeInfo.setClock(clock);
}
@After
public void after() {
branches.clear();
}
@AfterClass
public static void afterClass() {
Revision.resetClockToDefault();
ClusterNodeInfo.resetClockToDefault();
}
// OAK-2342
@Test
public void purge() throws Exception {
DocumentMK mk1 = newBuilder().setClusterId(1).open();
DocumentNodeStore ns1 = mk1.getNodeStore();
DocumentStore store = ns1.getDocumentStore();
DocumentMK mk2 = newBuilder().setClusterId(2)
.setDocumentStore(store).open();
DocumentNodeStore ns2 = mk2.getNodeStore();
createCollision(mk1);
createCollision(mk2);
String id = getIdFromPath("/");
assertEquals(2, getDocument(store, id).getLocalMap(COLLISIONS).size());
// restart node store
ns1.dispose();
mk1 = newBuilder().setClusterId(1)
.setDocumentStore(store).open();
ns1 = mk1.getNodeStore();
// must purge collision for clusterId 1
assertEquals(1, getDocument(store, id).getLocalMap(COLLISIONS).size());
ns1.dispose();
// restart other node store
ns2.dispose();
mk2 = newBuilder().setClusterId(2)
.setDocumentStore(store).open();
ns2 = mk2.getNodeStore();
// must purge collision for clusterId 2
assertEquals(0, getDocument(store, id).getLocalMap(COLLISIONS).size());
ns2.dispose();
}
@Test
public void isConflicting() throws CommitFailedException {
DocumentNodeStore ns = newBuilder()
.setAsyncDelay(0).getNodeStore();
DocumentStore store = ns.getDocumentStore();
String id = Utils.getIdFromPath("/test");
NodeBuilder b = ns.getRoot().builder();
b.child("test").setProperty("p", "a");
// test:{p:"a"}
Revision r1 = merge(ns, b).getRevision(ns.getClusterId());
assertNotNull(r1);
NodeDocument doc = getDocument(store, id);
// concurrent create
Revision c = ns.newRevision();
UpdateOp op = new UpdateOp(id, true);
NodeDocument.setDeleted(op, c, false);
Collision col = newCollision(doc, r1, op, c, ns);
assertTrue(col.isConflicting());
// concurrent change
op = new UpdateOp(id, false);
op.setMapEntry("p", c, "b");
col = newCollision(doc, r1, op, c, ns);
assertTrue(col.isConflicting());
b = ns.getRoot().builder();
b.child("test").setProperty("p", "b");
// test:{p:"b"}
Revision r2 = merge(ns, b).getRevision(ns.getClusterId());
assertNotNull(r2);
doc = getDocument(store, id);
// concurrent delete
c = ns.newRevision();
op = new UpdateOp(id, false);
op.setDelete(true);
NodeDocument.setDeleted(op, c, true);
col = newCollision(doc, r2, op, c, ns);
assertTrue(col.isConflicting());
// concurrent conflicting property set
op = new UpdateOp(id, false);
op.setMapEntry("p", c, "c");
col = newCollision(doc, r2, op, c, ns);
assertTrue(col.isConflicting());
// concurrent non-conflicting property set
op = new UpdateOp(id, false);
op.setMapEntry("q", c, "a");
col = newCollision(doc, r2, op, c, ns);
assertFalse(col.isConflicting());
b = ns.getRoot().builder();
b.child("test").remove();
// test (removed)
Revision r3 = merge(ns, b).getRevision(ns.getClusterId());
assertNotNull(r3);
doc = getDocument(store, id);
// concurrent delete
c = ns.newRevision();
op = new UpdateOp(id, false);
op.setDelete(true);
NodeDocument.setDeleted(op, c, true);
col = newCollision(doc, r3, op, c, ns);
assertTrue(col.isConflicting());
// concurrent conflicting property set
op = new UpdateOp(id, false);
op.setMapEntry("p", c, "d");
col = newCollision(doc, r3, op, c, ns);
assertTrue(col.isConflicting());
}
private static Collision newCollision(@NotNull NodeDocument document,
@NotNull Revision theirRev,
@NotNull UpdateOp ourOp,
@NotNull Revision ourRev,
@NotNull RevisionContext context) {
return new Collision(document, theirRev, ourOp, ourRev, context,
RevisionVector.fromString(""));
}
@Test
public void collisionOnOrphanedBranch() throws Exception {
DocumentStore store = new MemoryDocumentStore();
DocumentNodeStore ns = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(10).build();
NodeBuilder builder = ns.getRoot().builder();
// force a branch commit
for (int i = 0; i < 20; i++) {
builder.child("n-" + i).setProperty("p", "v");
}
retainBranches(ns);
ns.dispose();
NodeDocument root = store.find(NODES, Utils.getIdFromPath(ROOT));
assertNotNull(root);
assertThat(root.getLocalBranchCommits(), not(empty()));
// make sure start timestamp of node store is higher than
// branch commit revision timestamp
clock.getTimeIncreasing();
// start it up again
ns = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(10).build();
ns.updateClusterState();
root = store.find(NODES, Utils.getIdFromPath(ROOT));
assertNotNull(root);
// on init the DocumentNodeStore removes orphaned
// branch commit entries on the root document
assertThat(root.getLocalBranchCommits(), empty());
// but some changes are still there
Path p = Path.fromString("/n-0");
NodeDocument doc = store.find(NODES, Utils.getIdFromPath(p));
assertNotNull(doc);
assertThat(doc.getLocalBranchCommits(), not(empty()));
builder = ns.getRoot().builder();
builder.child("n-0");
merge(ns, builder);
// must not create a collision marker for a branch commit
// that cannot be merged
assertNoCollisions(store, ROOT);
}
@Test
public void collisionOnForeignOrphanedBranch() throws Exception {
DocumentStore store = new MemoryDocumentStore();
DocumentNodeStore ns1 = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(10).setClusterId(1).build();
DocumentNodeStore ns2 = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(10).setClusterId(2).build();
NodeBuilder builder = ns1.getRoot().builder();
// force a branch commit
for (int i = 0; i < 20; i++) {
builder.child("n-" + i).setProperty("p", "v");
}
retainBranches(ns1);
ns1.dispose();
ns2.updateClusterState();
NodeDocument root = store.find(NODES, Utils.getIdFromPath(ROOT));
assertNotNull(root);
assertThat(root.getLocalBranchCommits(), not(empty()));
// but some changes are still there
Path p = Path.fromString("/n-0");
NodeDocument doc = store.find(NODES, Utils.getIdFromPath(p));
assertNotNull(doc);
assertThat(doc.getLocalBranchCommits(), not(empty()));
builder = ns2.getRoot().builder();
builder.child("n-0");
merge(ns2, builder);
// must create a collision marker for a branch commit because
// it is not known when ns1 was stopped
root = store.find(NODES, Utils.getIdFromPath(ROOT));
assertNotNull(root);
assertThat(root.getLocalMap(COLLISIONS).keySet(), not(empty()));
}
@Test
public void collisionOnForeignOrphanedBranchAfterRestart() throws Exception {
DocumentStore store = new MemoryDocumentStore();
DocumentNodeStore ns1 = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(10).setClusterId(1).build();
DocumentNodeStore ns2 = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(10).setClusterId(2).build();
NodeBuilder builder = ns1.getRoot().builder();
// force a branch commit
for (int i = 0; i < 20; i++) {
builder.child("n-" + i).setProperty("p", "v");
}
retainBranches(ns1);
ns1.dispose();
NodeDocument root = store.find(NODES, Utils.getIdFromPath(ROOT));
assertNotNull(root);
assertThat(root.getLocalBranchCommits(), not(empty()));
// make sure start timestamp of node store is higher than
// branch commit revision timestamp
clock.getTimeIncreasing();
// start it up again
ns1 = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(10).setClusterId(1).build();
root = store.find(NODES, Utils.getIdFromPath(ROOT));
assertNotNull(root);
// on init the DocumentNodeStore removes orphaned
// branch commit entries on the root document
assertThat(root.getLocalBranchCommits(), empty());
// but some changes are still there
Path p = Path.fromString("/n-0");
NodeDocument doc = store.find(NODES, Utils.getIdFromPath(p));
assertNotNull(doc);
assertThat(doc.getLocalBranchCommits(), not(empty()));
ns2.updateClusterState();
builder = ns2.getRoot().builder();
builder.child("n-0");
merge(ns2, builder);
// must not create a collision marker for a branch commit
// from a clusterId that is inactive
assertNoCollisions(store, ROOT);
}
@Test
public void collisionWithBranchOnForeignOrphanedBranchAfterRestart() throws Exception {
int updateLimit = 10;
DocumentStore store = new MemoryDocumentStore();
DocumentNodeStore ns1 = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(updateLimit).setClusterId(1).build();
DocumentNodeStore ns2 = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(updateLimit).setClusterId(2).build();
NodeBuilder builder = ns1.getRoot().builder();
// force a branch commit
for (int i = 0; i < updateLimit * 2; i++) {
builder.child("n-" + i).setProperty("p", "v");
}
retainBranches(ns1);
ns1.dispose();
NodeDocument root = store.find(NODES, Utils.getIdFromPath(ROOT));
assertNotNull(root);
assertThat(root.getLocalBranchCommits(), not(empty()));
// make sure start timestamp of node store is higher than
// branch commit revision timestamp
clock.getTimeIncreasing();
// start it up again
ns1 = newBuilder()
.setDocumentStore(store).setAsyncDelay(0)
.setUpdateLimit(updateLimit).setClusterId(1).build();
root = store.find(NODES, Utils.getIdFromPath(ROOT));
assertNotNull(root);
// on init the DocumentNodeStore removes orphaned
// branch commit entries on the root document
assertThat(root.getLocalBranchCommits(), empty());
// but some changes are still there
Path p = Path.fromString("/n-0");
NodeDocument doc = store.find(NODES, Utils.getIdFromPath(p));
assertNotNull(doc);
assertThat(doc.getLocalBranchCommits(), not(empty()));
ns2.updateClusterState();
builder = ns2.getRoot().builder();
builder.child("n-0");
// force a branch commit
for (int i = 0; i < updateLimit * 2; i++) {
builder.child("test").child("c-" + i);
}
merge(ns2, builder);
// must not create a collision marker for a branch commit
// from a clusterId that is inactive
assertNoCollisions(store, ROOT);
}
private static void assertNoCollisions(DocumentStore store, Path p) {
NodeDocument doc = store.find(NODES, Utils.getIdFromPath(p));
assertNotNull(doc);
assertThat(doc.getLocalMap(COLLISIONS).keySet(), empty());
}
@NotNull
private static RevisionVector merge(DocumentNodeStore ns, NodeBuilder nb)
throws CommitFailedException {
ns.merge(nb, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return ns.getHeadRevision();
}
@NotNull
private static NodeDocument getDocument(DocumentStore store, String id) {
NodeDocument doc = store.find(NODES, id);
assertNotNull(doc);
return doc;
}
private void createCollision(DocumentMK mk) throws Exception {
String nodeName = "test-" + COUNTER.getAndIncrement();
// create branch
String b = mk.branch(null);
mk.commit("/", "+\"" + nodeName + "\":{\"p\":\"a\"}", b, null);
// commit a change resulting in a collision on branch
mk.commit("/", "+\"" + nodeName + "\":{\"p\":\"b\"}", null, null);
}
private DocumentMK.Builder newBuilder() {
return builderProvider.newBuilder().clock(clock);
}
/**
* Add all known branch referents to {@link #branches} to prevent clean up
* of orphaned branches.
*/
private void retainBranches(DocumentNodeStore ns) {
NodeDocument root = getRootDocument(ns.getDocumentStore());
for (Revision r : root.getLocalBranchCommits()) {
RevisionVector branchRev = new RevisionVector(r.asBranchRevision());
Branch b = ns.getBranches().getBranch(branchRev);
if (b == null) {
continue;
}
Branch.BranchReference ref = b.getRef();
if (ref == null) {
continue;
}
Object obj = ref.get();
if (obj != null) {
branches.add(obj);
}
}
}
}