OAK-8591: Conflict exception on commit

Merge revisions 1866382,1866697,1866730 from trunk

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/branches/1.8@1866751 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
index ca64786..b1adbec 100644
--- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
+++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
@@ -872,19 +872,15 @@
         if (validRevisions.containsKey(rev)) {
             return true;
         }
-        if (Utils.isCommitted(commitValue) && !readRevision.isBranch()) {
-            // no need to load commit root document, we can simply
-            // tell by looking at the commit revision whether the
-            // revision is valid/visible
-            Revision commitRev = Utils.resolveCommitRevision(rev, commitValue);
-            return !readRevision.isRevisionNewer(commitRev);
+        // get the commit value if it is not yet available
+        if (commitValue == null) {
+            commitValue = context.getCommitValue(rev, this);
         }
-
-        NodeDocument doc = getCommitRoot(rev);
-        if (doc == null) {
+        if (commitValue == null) {
+            // this change is not committed, hence not valid/visible
             return false;
         }
-        if (doc.isVisible(context, rev, commitValue, readRevision)) {
+        if (isVisible(context, rev, commitValue, readRevision)) {
             validRevisions.put(rev, commitValue);
             return true;
         }
@@ -2051,22 +2047,15 @@
      * to check.
      *
      * @param revision  the revision to check.
-     * @param commitValue the commit value of the revision to check or
-     *                    <code>null</code> if unknown.
+     * @param commitValue the commit value of the revision to check.
      * @param readRevision the read revision.
      * @return <code>true</code> if the revision is visible, otherwise
      *         <code>false</code>.
      */
     private boolean isVisible(@NotNull RevisionContext context,
                               @NotNull Revision revision,
-                              @Nullable String commitValue,
+                              @NotNull String commitValue,
                               @NotNull RevisionVector readRevision) {
-        if (commitValue == null) {
-            commitValue = context.getCommitValue(revision, this);
-        }
-        if (commitValue == null) {
-            return false;
-        }
         if (Utils.isCommitted(commitValue)) {
             Branch b = context.getBranches().getBranch(readRevision);
             if (b == null) {
diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java
index 5530755..d97c13f 100644
--- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java
+++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.lang.ref.ReferenceQueue;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -23,6 +24,9 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
@@ -50,7 +54,9 @@
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath;
 import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument;
 import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -473,6 +479,102 @@
     }
 
     @Test
+    public void getNewestRevisionAfterGC() throws Exception {
+        getNewestRevisionAfterGC(false);
+    }
+
+    @Test
+    public void getNewestRevisionAfterGCWithBranchCommit() throws Exception {
+        getNewestRevisionAfterGC(true);
+    }
+
+    @Test
+    public void getNodeAtRevisionAfterGC() throws Exception {
+        getNodeAtRevisionAfterGC(false);
+    }
+
+    @Test
+    public void getNodeAtRevisionAfterGCWithBranchRevision() throws Exception {
+        getNodeAtRevisionAfterGC(true);
+    }
+
+    private void getNodeAtRevisionAfterGC(boolean withBranch) throws Exception {
+        DocumentStore store = new MemoryDocumentStore();
+        Revision r = populateStoreAndGC(store);
+
+        // start fresh
+        DocumentNodeStore ns = createTestStore(store, 1, 0, 0);
+        String id = Utils.getIdFromPath("/bar/test");
+        NodeDocument doc = store.find(NODES, id);
+        assertNotNull(doc);
+
+        RevisionVector changeRev = new RevisionVector(r);
+        if (withBranch) {
+            changeRev = changeRev.asBranchRevision(1);
+        }
+        DocumentNodeState state = doc.getNodeAtRevision(ns, changeRev, null);
+        assertNotNull(state);
+        assertEquals(changeRev.asTrunkRevision(), state.getLastRevision());
+        assertNotNull(state.getProperty("p"));
+    }
+
+    private void getNewestRevisionAfterGC(boolean withBranch) throws Exception {
+        DocumentStore store = new MemoryDocumentStore();
+        Revision r = populateStoreAndGC(store);
+
+        // start fresh
+        DocumentNodeStore ns = createTestStore(store, 1, 0, 0);
+        String id = Utils.getIdFromPath("/bar/test");
+        NodeDocument doc = store.find(NODES, id);
+        assertNotNull(doc);
+
+        RevisionVector baseRev = ns.getHeadRevision();
+        Revision change = ns.newRevision();
+        Branch branch = null;
+        if (withBranch) {
+            SortedSet<Revision> branchCommits = new TreeSet<>(StableRevisionComparator.REVERSE);
+            branchCommits.add(change);
+            branch = new Branch(branchCommits, baseRev, new ReferenceQueue<>(), null);
+            baseRev = baseRev.asBranchRevision(1);
+        }
+        Revision rev = doc.getNewestRevision(ns, baseRev, change, branch, new HashSet<>());
+        assertEquals(r, rev);
+    }
+
+    private Revision populateStoreAndGC(DocumentStore store) throws Exception {
+        DocumentNodeStore ns = createTestStore(store, 1, 0);
+        NodeBuilder builder = ns.getRoot().builder();
+        builder.child("foo");
+        builder.child("bar").child("test").setProperty("p", "v");
+        merge(ns, builder);
+        // remember the revision
+        Revision r = ns.getHeadRevision().getRevision(1);
+        // perform changes that move the commit information
+        // to split documents
+        for (int i = 0; i < 100; i++) {
+            builder = ns.getRoot().builder();
+            builder.setProperty("p", "v-" + i);
+            merge(ns, builder);
+        }
+        ns.runBackgroundOperations();
+        String rootId = Utils.getIdFromPath("/");
+        NodeDocument rootDoc = store.find(NODES, rootId);
+        assertNotNull(rootDoc);
+        assertThat(rootDoc.getPreviousRanges().keySet(), not(empty()));
+
+        // trigger revision gc until split doc is removed
+        while (!rootDoc.getPreviousRanges().keySet().isEmpty()) {
+            ns.getVersionGarbageCollector().gc(1, TimeUnit.MILLISECONDS);
+            rootDoc = store.find(NODES, rootId);
+            assertNotNull(rootDoc);
+        }
+
+        ns.dispose();
+
+        return r;
+    }
+
+    @Test
     public void getChanges() throws Exception {
         final int numChanges = 200;
         Random random = new Random();