| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.jackrabbit.oak.plugins.document; |
| |
| import static org.apache.jackrabbit.oak.plugins.document.RecoveryHandler.NOOP; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.util.HashSet; |
| import java.util.List; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.mongodb.client.MongoDatabase; |
| |
| import org.apache.jackrabbit.oak.api.CommitFailedException; |
| import org.apache.jackrabbit.oak.commons.PathUtils; |
| import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; |
| import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection; |
| import org.apache.jackrabbit.oak.spi.blob.BlobStore; |
| import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; |
| import org.apache.jackrabbit.oak.spi.commit.CommitInfo; |
| import org.apache.jackrabbit.oak.spi.commit.EmptyHook; |
| import org.apache.jackrabbit.oak.spi.commit.Observer; |
| import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; |
| import org.apache.jackrabbit.oak.spi.state.NodeBuilder; |
| import org.apache.jackrabbit.oak.spi.state.NodeState; |
| import org.apache.jackrabbit.oak.spi.state.NodeStore; |
| import org.apache.jackrabbit.oak.stats.Clock; |
| import org.jetbrains.annotations.NotNull; |
| import org.json.simple.JSONObject; |
| import org.json.simple.parser.JSONParser; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| |
| /** |
| * A set of simple cluster tests. |
| */ |
| public class ClusterTest { |
| |
| @Rule |
| public MongoConnectionFactory connectionFactory = new MongoConnectionFactory(); |
| |
| private static final boolean MONGO_DB = false; |
| // private static final boolean MONGO_DB = true; |
| |
| private List<DocumentMK> mks = Lists.newArrayList(); |
| private MemoryDocumentStore ds; |
| private MemoryBlobStore bs; |
| |
| @After |
| public void resetClock() { |
| ClusterNodeInfo.resetClockToDefault(); |
| } |
| |
| @Test |
| public void threeNodes() throws Exception { |
| DocumentMK mk1 = createMK(1, 0); |
| DocumentMK mk2 = createMK(2, 0); |
| DocumentMK mk3 = createMK(3, 0); |
| |
| mk1.commit("/", "+\"test\":{}", null, null); |
| mk2.commit("/", "+\"a\":{}", null, null); |
| mk3.commit("/", "+\"b\":{}", null, null); |
| mk2.backgroundWrite(); |
| mk2.backgroundRead(); |
| mk3.backgroundWrite(); |
| mk3.backgroundRead(); |
| mk1.backgroundWrite(); |
| mk1.backgroundRead(); |
| mk2.backgroundWrite(); |
| mk2.backgroundRead(); |
| mk3.backgroundWrite(); |
| mk3.backgroundRead(); |
| |
| mk2.commit("/", "^\"test/x\":1", null, null); |
| String n3 = mk3.getNodes("/test", mk3.getHeadRevision(), 0, 0, 10, null); |
| // mk3 didn't see the previous change yet; |
| // it is questionable if this should prevent any changes to this node |
| // (currently it does not) |
| assertEquals("{\":childNodeCount\":0}", n3); |
| mk3.commit("/", "^\"test/y\":2", null, null); |
| |
| mk3.backgroundWrite(); |
| mk3.backgroundRead(); |
| mk1.backgroundWrite(); |
| mk1.backgroundRead(); |
| |
| String r1 = mk1.getHeadRevision(); |
| String n1 = mk1.getNodes("/test", r1, 0, 0, 10, null); |
| // mk1 only sees the change of mk3 so far |
| assertEquals("{\"y\":2,\":childNodeCount\":0}", n1); |
| |
| mk2.backgroundWrite(); |
| mk2.backgroundRead(); |
| mk1.backgroundWrite(); |
| mk1.backgroundRead(); |
| |
| String r1b = mk1.getHeadRevision(); |
| String n1b = mk1.getNodes("/test", r1b, 0, 0, 10, null); |
| JSONParser parser = new JSONParser(); |
| JSONObject obj = (JSONObject) parser.parse(n1b); |
| // mk1 now sees both changes |
| assertEquals(1L, obj.get("x")); |
| assertEquals(2L, obj.get("y")); |
| assertEquals(0L, obj.get(":childNodeCount")); |
| } |
| |
| @Test |
| public void clusterNodeInfoLease() throws InterruptedException { |
| Clock c = new Clock.Virtual(); |
| c.waitUntil(System.currentTimeMillis()); |
| ClusterNodeInfo.setClock(c); |
| |
| MemoryDocumentStore store = new MemoryDocumentStore(); |
| ClusterNodeInfo c1, c2; |
| c1 = ClusterNodeInfo.getInstance(store, NOOP, "m1", null, 0); |
| assertEquals(1, c1.getId()); |
| // expire lease |
| c.waitUntil(c1.getLeaseEndTime() + ClusterNodeInfo.DEFAULT_LEASE_UPDATE_INTERVAL_MILLIS); |
| |
| // using a NOOP RecoveryHandler must prevent use of expired clusterId 1 (OAK-7316) |
| c2 = ClusterNodeInfo.getInstance(store, NOOP, "m1", null, 0); |
| assertEquals(2, c2.getId()); |
| } |
| |
| @Test |
| public void openCloseOpen() { |
| MemoryDocumentStore ds = new MemoryDocumentStore(); |
| MemoryBlobStore bs = new MemoryBlobStore(); |
| |
| DocumentMK mk1 = createMK(1, 0, ds, bs); |
| mk1.commit("/", "+\"a\": {}", null, null); |
| mk1.commit("/", "-\"a\"", null, null); |
| mk1.runBackgroundOperations(); |
| |
| DocumentMK mk2 = createMK(2, 0, ds, bs); |
| mk2.commit("/", "+\"a\": {}", null, null); |
| mk2.commit("/", "-\"a\"", null, null); |
| mk2.runBackgroundOperations(); |
| |
| DocumentMK mk3 = createMK(3, 0, ds, bs); |
| mk3.commit("/", "+\"a\": {}", null, null); |
| mk3.commit("/", "-\"a\"", null, null); |
| mk3.runBackgroundOperations(); |
| |
| DocumentMK mk4 = createMK(4, 0, ds, bs); |
| mk4.commit("/", "+\"a\": {}", null, null); |
| mk4.runBackgroundOperations(); |
| |
| DocumentMK mk5 = createMK(5, 0, ds, bs); |
| mk5.commit("/", "-\"a\"", null, null); |
| mk5.commit("/", "+\"a\": {}", null, null); |
| } |
| |
| @Test |
| public void clusterBranchInVisibility() throws InterruptedException { |
| DocumentMK mk1 = createMK(1); |
| mk1.commit("/", "+\"regular\": {}", null, null); |
| String b1 = mk1.branch(null); |
| String b2 = mk1.branch(null); |
| b1 = mk1.commit("/", "+\"branchVisible\": {}", b1, null); |
| b2 = mk1.commit("/", "+\"branchInvisible\": {}", b2, null); |
| mk1.merge(b1, null); |
| // mk1.merge only becomes visible to mk2 after async delay |
| // therefore dispose mk1 now to make sure it flushes |
| // unsaved last revisions |
| disposeMK(mk1); |
| |
| DocumentMK mk2 = createMK(2); |
| String nodes = mk2.getNodes("/", null, 0, 0, 100, null); |
| assertEquals("{\"branchVisible\":{},\"regular\":{},\":childNodeCount\":2}", nodes); |
| } |
| |
| /** |
| * Test for OAK-1254 |
| */ |
| @Test |
| public void clusterBranchRebase() throws Exception { |
| DocumentMK mk1 = createMK(1, 0); |
| mk1.commit("/", "+\"test\":{}", null, null); |
| mk1.runBackgroundOperations(); |
| DocumentMK mk2 = createMK(2, 0); |
| DocumentMK mk3 = createMK(3, 0); |
| |
| DocumentNodeStore ns3 = mk3.getNodeStore(); |
| // the next line is required for the test even if it |
| // just reads from the node store. do not remove! |
| traverse(ns3.getRoot(), "/"); |
| |
| String b3 = mk3.branch(null); |
| b3 = mk3.commit("/", "+\"mk3\":{}", b3, null); |
| assertTrue(mk3.nodeExists("/test", b3)); |
| |
| mk2.commit("/", "+\"test/mk21\":{}", null, null); |
| mk2.runBackgroundOperations(); |
| |
| mk3.runBackgroundOperations(); // pick up changes from mk2 |
| String base3 = mk3.getHeadRevision(); |
| |
| assertFalse(mk3.nodeExists("/test/mk21", b3)); |
| b3 = mk3.rebase(b3, base3); |
| |
| mk2.commit("/", "+\"test/mk22\":{}", null, null); |
| mk2.runBackgroundOperations(); |
| |
| mk3.runBackgroundOperations(); // pick up changes from mk2 |
| |
| DocumentNodeState base = ns3.getNode("/", RevisionVector.fromString(base3)); |
| assertNotNull(base); |
| NodeState branchHead = ns3.getNode("/", RevisionVector.fromString(b3)); |
| assertNotNull(branchHead); |
| TrackingDiff diff = new TrackingDiff(); |
| branchHead.compareAgainstBaseState(base, diff); |
| assertEquals(1, diff.added.size()); |
| assertEquals(Sets.newHashSet("/mk3"), diff.added); |
| assertEquals(new HashSet<String>(), diff.deleted); |
| } |
| |
| @Test |
| public void clusterNodeInfo() { |
| MemoryDocumentStore store = new MemoryDocumentStore(); |
| ClusterNodeInfo c1, c2; |
| |
| c1 = ClusterNodeInfo.getInstance(store, NOOP, "m1", null, 0); |
| assertEquals(1, c1.getId()); |
| c1.dispose(); |
| |
| // get the same id |
| c1 = ClusterNodeInfo.getInstance(store, NOOP, "m1", null, 0); |
| assertEquals(1, c1.getId()); |
| c1.dispose(); |
| |
| // a different machine |
| // must get inactive id (OAK-7316) |
| c1 = ClusterNodeInfo.getInstance(store, NOOP, "m2", null, 0); |
| assertEquals(1, c1.getId()); |
| |
| // yet another machine |
| c2 = ClusterNodeInfo.getInstance(store, NOOP, "m3", "/a", 0); |
| assertEquals(2, c2.getId()); |
| |
| c1.dispose(); |
| c2.dispose(); |
| |
| // must acquire same id as before with matching machineId/instanceId |
| c1 = ClusterNodeInfo.getInstance(store, NOOP, "m3", "/a", 0); |
| assertEquals(2, c1.getId()); |
| |
| c1.dispose(); |
| |
| c1 = ClusterNodeInfo.getInstance(store, NOOP, "m3", "/b", 0); |
| assertEquals(1, c1.getId()); |
| |
| c1.dispose(); |
| } |
| |
| @Test |
| public void conflict() { |
| DocumentMK mk1 = createMK(1, 0); |
| DocumentMK mk2 = createMK(2, 0); |
| |
| String m1r0 = mk1.getHeadRevision(); |
| String m2r0 = mk2.getHeadRevision(); |
| |
| mk1.commit("/", "+\"test\":{}", m1r0, null); |
| try { |
| mk2.commit("/", "+\"test\":{}", m2r0, null); |
| fail(); |
| } catch (DocumentStoreException e) { |
| // expected |
| } |
| mk1.runBackgroundOperations(); |
| mk2.runBackgroundOperations(); |
| |
| // node becomes visible after running background operations |
| String n1 = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 10, null); |
| String n2 = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 10, null); |
| assertEquals(n1, n2); |
| } |
| |
| @Test |
| public void revisionVisibility() throws InterruptedException { |
| DocumentMK mk1 = createMK(1); |
| DocumentMK mk2 = createMK(2); |
| |
| String m2h; |
| m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 2, null); |
| assertEquals("{\":childNodeCount\":0}", m2h); |
| String oldHead = mk2.getHeadRevision(); |
| |
| mk1.commit("/", "+\"test\":{}", null, null); |
| String m1h = mk1.getNodes("/", mk1.getHeadRevision(), 0, 0, 2, null); |
| assertEquals("{\"test\":{},\":childNodeCount\":1}", m1h); |
| |
| // not available yet... |
| assertEquals("{\":childNodeCount\":0}", m2h); |
| m2h = mk2.getNodes("/test", mk2.getHeadRevision(), 0, 0, 2, null); |
| |
| // the delay is 10 ms - wait at most 1000 millis |
| for (int i = 0; i < 100; i++) { |
| Thread.sleep(10); |
| if (mk1.getPendingWriteCount() > 0) { |
| continue; |
| } |
| if (mk2.getHeadRevision().equals(oldHead)) { |
| continue; |
| } |
| break; |
| } |
| |
| // so now it should be available |
| m2h = mk2.getNodes("/", mk2.getHeadRevision(), 0, 0, 5, null); |
| assertEquals("{\"test\":{},\":childNodeCount\":1}", m2h); |
| } |
| |
| @Test |
| public void rollbackAfterConflict() { |
| DocumentMK mk1 = createMK(1); |
| DocumentMK mk2 = createMK(2); |
| |
| String m1r0 = mk1.getHeadRevision(); |
| String m2r0 = mk2.getHeadRevision(); |
| |
| mk1.commit("/", "+\"test\":{}", m1r0, null); |
| try { |
| mk2.commit("/", "+\"a\": {} +\"test\":{}", m2r0, null); |
| fail(); |
| } catch (DocumentStoreException e) { |
| // expected |
| } |
| mk2.commit("/", "+\"a\": {}", null, null); |
| } |
| |
| @Test |
| public void fromExternalChange() throws Exception { |
| final List<DocumentNodeState> rootStates1 = Lists.newArrayList(); |
| DocumentNodeStore ns1 = createMK(1, 0).getNodeStore(); |
| ns1.addObserver(new Observer() { |
| @Override |
| public void contentChanged(@NotNull NodeState root, |
| @NotNull CommitInfo info) { |
| rootStates1.add((DocumentNodeState) root); |
| } |
| }); |
| final List<DocumentNodeState> rootStates2 = Lists.newArrayList(); |
| DocumentNodeStore ns2 = createMK(2, 0).getNodeStore(); |
| ns2.addObserver(new Observer() { |
| @Override |
| public void contentChanged(@NotNull NodeState root, |
| @NotNull CommitInfo info) { |
| rootStates2.add((DocumentNodeState) root); |
| } |
| }); |
| |
| ns1.runBackgroundOperations(); |
| ns2.runBackgroundOperations(); |
| |
| rootStates1.clear(); |
| rootStates2.clear(); |
| |
| NodeBuilder builder = ns1.getRoot().builder(); |
| builder.child("foo"); |
| merge(ns1, builder); |
| |
| assertEquals(1, rootStates1.size()); |
| assertEquals(0, rootStates2.size()); |
| assertFalse(rootStates1.get(0).isFromExternalChange()); |
| |
| ns1.runBackgroundOperations(); |
| ns2.runBackgroundOperations(); |
| |
| assertEquals(1, rootStates1.size()); |
| assertEquals(1, rootStates2.size()); |
| assertTrue(rootStates2.get(0).isFromExternalChange()); |
| NodeState foo = rootStates2.get(0).getChildNode("foo"); |
| assertTrue(foo instanceof DocumentNodeState); |
| assertTrue(((DocumentNodeState) foo).isFromExternalChange()); |
| } |
| |
| @Before |
| @After |
| public void clear() { |
| for (DocumentMK mk : mks) { |
| mk.dispose(); |
| } |
| mks.clear(); |
| if (MONGO_DB) { |
| MongoDatabase db = connectionFactory.getConnection().getDatabase(); |
| MongoUtils.dropCollections(db); |
| } |
| } |
| |
| private static NodeState merge(NodeStore store, NodeBuilder builder) |
| throws CommitFailedException { |
| return store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); |
| } |
| |
| private DocumentMK createMK(int clusterId) { |
| return createMK(clusterId, 10); |
| } |
| |
| private DocumentMK createMK(int clusterId, int asyncDelay) { |
| if (MONGO_DB) { |
| MongoConnection connection = connectionFactory.getConnection(); |
| return register(new DocumentMK.Builder() |
| .setMongoDB(connection.getMongoClient(), connection.getDBName()) |
| .setClusterId(clusterId).setAsyncDelay(asyncDelay).open()); |
| } else { |
| if (ds == null) { |
| ds = new MemoryDocumentStore(); |
| } |
| if (bs == null) { |
| bs = new MemoryBlobStore(); |
| } |
| return createMK(clusterId, asyncDelay, ds, bs); |
| } |
| } |
| |
| private DocumentMK createMK(int clusterId, int asyncDelay, |
| DocumentStore ds, BlobStore bs) { |
| return register(new DocumentMK.Builder().setDocumentStore(ds) |
| .setBlobStore(bs).setClusterId(clusterId) |
| .setAsyncDelay(asyncDelay).open()); |
| } |
| |
| private DocumentMK register(DocumentMK mk) { |
| mks.add(mk); |
| return mk; |
| } |
| |
| private void disposeMK(DocumentMK mk) { |
| mk.dispose(); |
| for (int i = 0; i < mks.size(); i++) { |
| if (mks.get(i) == mk) { |
| mks.remove(i); |
| } |
| } |
| } |
| |
| private void traverse(NodeState node, String path) { |
| for (ChildNodeEntry child : node.getChildNodeEntries()) { |
| traverse(child.getNodeState(), PathUtils.concat(path, child.getName())); |
| } |
| } |
| } |