OAK-9149 : Phased, batched backgroundSplit - merges PR#260

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1883591 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
index 35d56e1..4cc51be 100644
--- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
+++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
@@ -2451,58 +2451,126 @@
     }
 
     private void backgroundSplit() {
-        Set<Path> invalidatedPaths = new HashSet<>();
+        final int initialCapacity = getCreateOrUpdateBatchSize() + 4;
+        Set<Path> invalidatedPaths = new HashSet<>(initialCapacity);
+        Set<Path> pathsToInvalidate = new HashSet<>(initialCapacity);
         RevisionVector head = getHeadRevision();
-        for (Iterator<String> it = splitCandidates.keySet().iterator(); it.hasNext();) {
-            String id = it.next();
+        // OAK-9149 : With backgroundSplit being done in batches, the
+        // updateOps must be executed in "phases".
+        // Reason being that the (DocumentStore) batch calls
+        // are not atomic. That means they could potentially
+        // be partially executed only - without any guarantees on
+        // which part is executed and which not.
+        // The split algorithm, however, requires that
+        // a part of the operations, namely intermediate/garbage/split ops,
+        // are executed *before* the main document is updated.
+        // In order to reflect this necessity in the batch variant,
+        // all those intermediate/garbage/split updateOps are grouped
+        // into a first phase - and the main document updateOps in a second phase.
+        // That way, if the first phase fails, partially, the main documents
+        // are not yet touched.
+        // TODO but if the split fails, we create actual garbage that cannot
+        // be cleaned up later, since there is no "pointer" to it. That's
+        // something to look at/consider at some point.
+
+        // phase1 therefore only contains intermediate/garbage/split updateOps
+        List<UpdateOp> splitOpsPhase1 = new ArrayList<>(initialCapacity);
+        // phase2 contains main document updateOps.
+        List<UpdateOp> splitOpsPhase2 = new ArrayList<>(initialCapacity);
+        List<String> removeCandidates = new ArrayList<>(initialCapacity);
+        for (String id : splitCandidates.keySet()) {
             NodeDocument doc = store.find(Collection.NODES, id);
             if (doc == null) {
                 continue;
             }
             cleanCollisions(doc, collisionGarbageBatchSize);
-            for (UpdateOp op : doc.split(this, head, binarySize)) {
+            Iterator<UpdateOp> it = doc.split(this, head, binarySize).iterator();
+            while(it.hasNext()) {
+                UpdateOp op = it.next();
                 Path path = doc.getPath();
                 // add an invalidation journal entry, unless the path
                 // already has a pending _lastRev update or an invalidation
                 // entry was already added in this backgroundSplit() call
-                if (unsavedLastRevisions.get(path) == null
-                        && invalidatedPaths.add(path)) {
-                    // create journal entry for cache invalidation
-                    JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
-                    entry.modified(path);
-                    Revision r = newRevision().asBranchRevision();
-                    UpdateOp journalOp = entry.asUpdateOp(r);
-                    if (store.create(JOURNAL, singletonList(journalOp))) {
-                        changes.invalidate(singletonList(r));
-                        LOG.debug("Journal entry {} created for split of document {}",
-                                journalOp.getId(), path);
-                    } else {
-                        String msg = "Unable to create journal entry " +
-                                journalOp.getId() + " for document invalidation. " +
-                                "Will be retried with next background split " +
-                                "operation.";
-                        throw new DocumentStoreException(msg);
-                    }
+                if (unsavedLastRevisions.get(path) == null && !invalidatedPaths.contains(path)) {
+                    pathsToInvalidate.add(path);
                 }
-                // apply the split operations
-                NodeDocument before = null;
-                if (!op.isNew() ||
-                        !store.create(Collection.NODES, Collections.singletonList(op))) {
-                    before = store.createOrUpdate(Collection.NODES, op);
+                // the last entry is the main document update
+                // (as per updated NodeDocument.split documentation).
+                if (it.hasNext()) {
+                    splitOpsPhase1.add(op);
+                } else {
+                    splitOpsPhase2.add(op);
                 }
+            }
+            removeCandidates.add(id);
+            if (splitOpsPhase1.size() >= getCreateOrUpdateBatchSize()
+                    || splitOpsPhase2.size() >= getCreateOrUpdateBatchSize()) {
+                invalidatePaths(pathsToInvalidate);
+                batchSplit(splitOpsPhase1);
+                batchSplit(splitOpsPhase2);
+                invalidatedPaths.addAll(pathsToInvalidate);
+                pathsToInvalidate.clear();
+                splitOpsPhase1.clear();
+                splitOpsPhase2.clear();
+                splitCandidates.keySet().removeAll(removeCandidates);
+                removeCandidates.clear();
+            }
+        }
+
+        if (splitOpsPhase1.size() + splitOpsPhase2.size() > 0) {
+            invalidatePaths(pathsToInvalidate);
+            batchSplit(splitOpsPhase1);
+            batchSplit(splitOpsPhase2);
+            splitCandidates.keySet().removeAll(removeCandidates);
+        }
+    }
+
+    private void invalidatePaths(@NotNull Set<Path> pathsToInvalidate) {
+        if (pathsToInvalidate.isEmpty()) {
+            // nothing to do
+            return;
+        }
+        // create journal entry for cache invalidation
+        JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
+        entry.modified(pathsToInvalidate);
+        Revision r = newRevision().asBranchRevision();
+        UpdateOp journalOp = entry.asUpdateOp(r);
+        if (store.create(JOURNAL, singletonList(journalOp))) {
+            changes.invalidate(singletonList(r));
+            LOG.debug("Journal entry {} created for split of document(s) {}",
+                    journalOp.getId(), pathsToInvalidate);
+        } else {
+            String msg = "Unable to create journal entry " +
+                    journalOp.getId() + " for document invalidation. " +
+                    "Will be retried with next background split " +
+                    "operation.";
+            throw new DocumentStoreException(msg);
+        }
+    }
+
+    private void batchSplit(@NotNull List<UpdateOp> splitOps) {
+        if (splitOps.isEmpty()) {
+            // nothing to do
+            return;
+        }
+        // apply the split operations
+        List<NodeDocument> beforeList = store.createOrUpdate(Collection.NODES, splitOps);
+        if (LOG.isDebugEnabled()) {
+            // this is rather expensive - but given we were doing log.debug before
+            // the batchSplit mechanism, so this somewhat negates the batch improvement indeed
+            for (int i = 0; i < splitOps.size(); i++) {
+                UpdateOp op = splitOps.get(i);
+                NodeDocument before = beforeList.size() > i ? beforeList.get(i) : null;
                 if (before != null) {
-                    if (LOG.isDebugEnabled()) {
-                        NodeDocument after = store.find(Collection.NODES, op.getId());
-                        if (after != null) {
-                            LOG.debug("Split operation on {}. Size before: {}, after: {}",
-                                    id, before.getMemory(), after.getMemory());
-                        }
+                    NodeDocument after = store.find(Collection.NODES, op.getId());
+                    if (after != null) {
+                        LOG.debug("Split operation on {}. Size before: {}, after: {}",
+                                op.getId(), before.getMemory(), after.getMemory());
                     }
                 } else {
                     LOG.debug("Split operation created {}", op.getId());
                 }
             }
-            it.remove();
         }
     }
 
diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
index 80d124c..e22ac1d 100644
--- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
+++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
@@ -1170,7 +1170,8 @@
      *                the document store.
      * @param binarySize a function that returns the binary size of the given
      *                   JSON property value String.
-     * @return the split operations.
+     * @return the split operations, whereby the last updateOp is guaranteed to be
+     * the update of the main document (unless the entire list is empty)
      */
     @NotNull
     public Iterable<UpdateOp> split(@NotNull RevisionContext context,
diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
index fb52155..d4e0988 100644
--- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
+++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
@@ -81,6 +81,28 @@
         lastLogTime = now();
     }
 
+    public void reset() {
+        startTime = 0;
+        counts.clear();
+        lastLogTime = 0;
+        totalLogTime = 0;
+        slowCalls.clear();
+    }
+
+    public long getOverallTime() {
+        long overallTime = 0;
+        for (Count count : counts.values()) {
+            overallTime += count.total;
+        }
+        return overallTime;
+    }
+
+    public long getAndResetOverallTime() {
+        final long result = getOverallTime();
+        reset();
+        return result;
+    }
+
     private boolean logCommonCall() {
         return callCount % 10 == 0;
     }
diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java
new file mode 100644
index 0000000..a1b5ce3
--- /dev/null
+++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState.binaryProperty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
+import org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
+
+/**
+ * Check correct splitting of documents (OAK-926 & OAK-1342).
+ */
+@RunWith(Parameterized.class)
+public class DocumentBatchSplitTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DocumentBatchSplitTest.class);
+
+    private String createOrUpdateBatchSize;
+    private boolean createOrUpdateBatchSizeIsNull;
+
+    private DocumentStoreFixture fixture;
+    protected DocumentMK mk;
+
+    public DocumentBatchSplitTest(DocumentStoreFixture fixture) {
+        this.fixture = fixture;
+    }
+
+    @Parameterized.Parameters(name="{0}")
+    public static java.util.Collection<Object[]> fixtures() throws IOException {
+        List<Object[]> fixtures = Lists.newArrayList();
+        fixtures.add(new Object[] {new DocumentStoreFixture.MemoryFixture()});
+
+        DocumentStoreFixture mongo = new DocumentStoreFixture.MongoFixture();
+        if(mongo.isAvailable()){
+            fixtures.add(new Object[] {mongo});
+        }
+        return fixtures;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (mk != null) {
+            mk.dispose();
+            mk = null;
+        }
+        fixture.dispose();
+        // reset log level to default
+        enableLevel("org", null);
+    }
+
+    @Before
+    public void backupProperty() {
+        createOrUpdateBatchSize = System.getProperty("oak.documentMK.createOrUpdateBatchSize");
+        if (createOrUpdateBatchSize == null) {
+            createOrUpdateBatchSizeIsNull = true;
+        }
+    }
+
+    @After
+    public void restoreProperty() {
+        if (createOrUpdateBatchSize != null) {
+            System.setProperty("oak.documentMK.createOrUpdateBatchSize", createOrUpdateBatchSize);
+        } else if (createOrUpdateBatchSizeIsNull) {
+            System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
+        }
+    }
+
+    @Test
+    @Ignore(value = "useful for benchmarking only, long execution duration")
+    public void batchSplitBenchmark() throws Exception {
+        int[] batchSizes = new int[] {1,2,10,30,50,75,100,200,300,400,500,1000,2000,5000,10000};
+        for (int batchSize : batchSizes) {
+            batchSplitTest(batchSize, 10000);
+            batchSplitTest(batchSize, 10000);
+        }
+    }
+
+    @Test
+    public void largeBatchSplit() throws Exception {
+        batchSplitTest(200, 1000);
+    }
+
+    @Test
+    public void mediumBatchSplit() throws Exception {
+        batchSplitTest(50, 1000);
+    }
+
+    @Test
+    public void smallBatchSplit() throws Exception {
+        batchSplitTest(2, 1000);
+    }
+
+    @Test
+    public void noBatchSplit() throws Exception {
+        batchSplitTest(1, 1000);
+    }
+
+    /** Make sure we have a test that has log level set to DEBUG */
+    @Test
+    public void debugLogLevelBatchSplit() throws Exception {
+        enableLevel("org", Level.DEBUG);
+        batchSplitTest(50, 1000);
+    }
+
+    private void batchSplitTest(int batchSize, int splitDocCnt) throws Exception {
+        LOG.info("batchSplitTest: batchSize = " + batchSize+ ", splitDocCnt = " + splitDocCnt +
+                ", fixture = " + fixture);
+        // this tests wants to use CountingDocumentStore
+        // plus it wants to set the batchSize
+        if (mk != null) {
+            mk.dispose();
+            mk = null;
+        }
+        if (fixture.getName().equals("MongoDB")) {
+            MongoUtils.dropCollections(MongoUtils.DB);
+        }
+
+        System.setProperty("oak.documentMK.createOrUpdateBatchSize", String.valueOf(batchSize));
+
+        DocumentMK.Builder mkBuilder = new DocumentMK.Builder();
+        DocumentStore delegateStore = fixture.createDocumentStore();
+        TimingDocumentStoreWrapper timingStore = new TimingDocumentStoreWrapper(delegateStore);
+        CountingDocumentStore store = new CountingDocumentStore(timingStore);
+        mkBuilder.setDocumentStore(store);
+        // disable automatic background operations
+        mkBuilder.setAsyncDelay(0);
+        mk = mkBuilder.open();
+        DocumentNodeStore ns = mk.getNodeStore();
+        assertEquals(batchSize, ns.getCreateOrUpdateBatchSize());
+
+        NodeBuilder builder = ns.getRoot().builder();
+        for(int child = 0; child < 100; child++) {
+            builder.child("testchild-" + child);
+        }
+        ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        for(int i=0; i<2; i++) {
+            builder = ns.getRoot().builder();
+            for(int child = 0; child < splitDocCnt; child++) {
+                PropertyState binary = binaryProperty("prop", randomBytes(5 * 1024));
+                builder.child("testchild-" + child).setProperty(binary);
+            }
+            ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        }
+        timingStore.reset();
+        store.resetCounters();
+        final long start = System.currentTimeMillis();
+        ns.runBackgroundUpdateOperations();
+        int createOrUpdateCalls = store.getNumCreateOrUpdateCalls(NODES);
+        final long remoteCallMeasurement = timingStore.getAndResetOverallTime();
+        final long totalSplitDuration = (System.currentTimeMillis() - start);
+        final long localSplitPart = totalSplitDuration - remoteCallMeasurement;
+        LOG.info("batchSplitTest: batchSize = " + batchSize +
+                ", splitDocCnt = " + splitDocCnt +
+                ", createOrUpdateCalls = " + createOrUpdateCalls +
+                ", fixture = " + fixture.getName() +
+                ", split total ms = " + (System.currentTimeMillis() - start) +
+                " (thereof local = " + localSplitPart +
+                ", remote = " + remoteCallMeasurement + ")");
+        int expected = 2 * (splitDocCnt / batchSize)       /* 2 calls per batch */
+                + 2 * Math.min(1, splitDocCnt % batchSize) /* 1 additional pair for the last batch */
+                + 1;                                       /* 1 more for backgroundWrite's update to root */
+        assertTrue("batchSize = " + batchSize
+                + ", splitDocCnt = " + splitDocCnt
+                + ", expected=" + expected
+                + ", createOrUpdates=" + createOrUpdateCalls,
+                createOrUpdateCalls >= expected && createOrUpdateCalls <= expected + 2);
+        VersionGarbageCollector gc = ns.getVersionGarbageCollector();
+
+        int actualSplitDocGCCount = 0;
+        long timeout = ns.getClock().getTime() + 20000;
+        while(actualSplitDocGCCount < splitDocCnt && ns.getClock().getTime() < timeout) {
+            VersionGCStats stats = gc.gc(1, TimeUnit.MILLISECONDS);
+            actualSplitDocGCCount += stats.splitDocGCCount;
+            if (actualSplitDocGCCount != splitDocCnt) {
+                LOG.info("batchSplitTest: Expected " + splitDocCnt + ", actual " + actualSplitDocGCCount);
+                // advance time a bit to ensure gc does clean up the split docs
+                ns.getClock().waitUntil(ns.getClock().getTime() + 1000);
+                ns.runBackgroundUpdateOperations();
+            }
+        }
+
+        // make sure those splitDocCnt split docs are deleted
+        assertTrue("gc not as expected: expected " + splitDocCnt
+                + ", got " + actualSplitDocGCCount, splitDocCnt <= actualSplitDocGCCount);
+
+        mk.dispose();
+        mk = null;
+    }
+
+    private byte[] randomBytes(int num) {
+        Random random = new Random(42);
+        byte[] data = new byte[num];
+        random.nextBytes(data);
+        return data;
+    }
+
+    // TODO: from DocumentStoreStatsTest
+    // but there are various places such as RevisionsCommand, BroadcastTest that have similar code.
+    // we might want to move this to a new common util/helper
+    private static void enableLevel(String logName, Level level){
+        ((LoggerContext)LoggerFactory.getILoggerFactory())
+                .getLogger(logName).setLevel(level);
+    }
+}