OAK-9149 : Phased, batched backgroundSplit - merges PR#260
git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1883591 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
index 35d56e1..4cc51be 100644
--- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
+++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
@@ -2451,58 +2451,126 @@
}
private void backgroundSplit() {
- Set<Path> invalidatedPaths = new HashSet<>();
+ final int initialCapacity = getCreateOrUpdateBatchSize() + 4;
+ Set<Path> invalidatedPaths = new HashSet<>(initialCapacity);
+ Set<Path> pathsToInvalidate = new HashSet<>(initialCapacity);
RevisionVector head = getHeadRevision();
- for (Iterator<String> it = splitCandidates.keySet().iterator(); it.hasNext();) {
- String id = it.next();
+ // OAK-9149 : With backgroundSplit being done in batches, the
+ // updateOps must be executed in "phases".
+ // Reason being that the (DocumentStore) batch calls
+ // are not atomic. That means they could potentially
+ // be partially executed only - without any guarantees on
+ // which part is executed and which not.
+ // The split algorithm, however, requires that
+ // a part of the operations, namely intermediate/garbage/split ops,
+ // are executed *before* the main document is updated.
+ // In order to reflect this necessity in the batch variant,
+ // all those intermediate/garbage/split updateOps are grouped
+ // into a first phase - and the main document updateOps in a second phase.
+ // That way, if the first phase fails, partially, the main documents
+ // are not yet touched.
+ // TODO but if the split fails, we create actual garbage that cannot
+ // be cleaned up later, since there is no "pointer" to it. That's
+ // something to look at/consider at some point.
+
+ // phase1 therefore only contains intermediate/garbage/split updateOps
+ List<UpdateOp> splitOpsPhase1 = new ArrayList<>(initialCapacity);
+ // phase2 contains main document updateOps.
+ List<UpdateOp> splitOpsPhase2 = new ArrayList<>(initialCapacity);
+ List<String> removeCandidates = new ArrayList<>(initialCapacity);
+ for (String id : splitCandidates.keySet()) {
NodeDocument doc = store.find(Collection.NODES, id);
if (doc == null) {
continue;
}
cleanCollisions(doc, collisionGarbageBatchSize);
- for (UpdateOp op : doc.split(this, head, binarySize)) {
+ Iterator<UpdateOp> it = doc.split(this, head, binarySize).iterator();
+ while(it.hasNext()) {
+ UpdateOp op = it.next();
Path path = doc.getPath();
// add an invalidation journal entry, unless the path
// already has a pending _lastRev update or an invalidation
// entry was already added in this backgroundSplit() call
- if (unsavedLastRevisions.get(path) == null
- && invalidatedPaths.add(path)) {
- // create journal entry for cache invalidation
- JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
- entry.modified(path);
- Revision r = newRevision().asBranchRevision();
- UpdateOp journalOp = entry.asUpdateOp(r);
- if (store.create(JOURNAL, singletonList(journalOp))) {
- changes.invalidate(singletonList(r));
- LOG.debug("Journal entry {} created for split of document {}",
- journalOp.getId(), path);
- } else {
- String msg = "Unable to create journal entry " +
- journalOp.getId() + " for document invalidation. " +
- "Will be retried with next background split " +
- "operation.";
- throw new DocumentStoreException(msg);
- }
+ if (unsavedLastRevisions.get(path) == null && !invalidatedPaths.contains(path)) {
+ pathsToInvalidate.add(path);
}
- // apply the split operations
- NodeDocument before = null;
- if (!op.isNew() ||
- !store.create(Collection.NODES, Collections.singletonList(op))) {
- before = store.createOrUpdate(Collection.NODES, op);
+ // the last entry is the main document update
+ // (as per updated NodeDocument.split documentation).
+ if (it.hasNext()) {
+ splitOpsPhase1.add(op);
+ } else {
+ splitOpsPhase2.add(op);
}
+ }
+ removeCandidates.add(id);
+ if (splitOpsPhase1.size() >= getCreateOrUpdateBatchSize()
+ || splitOpsPhase2.size() >= getCreateOrUpdateBatchSize()) {
+ invalidatePaths(pathsToInvalidate);
+ batchSplit(splitOpsPhase1);
+ batchSplit(splitOpsPhase2);
+ invalidatedPaths.addAll(pathsToInvalidate);
+ pathsToInvalidate.clear();
+ splitOpsPhase1.clear();
+ splitOpsPhase2.clear();
+ splitCandidates.keySet().removeAll(removeCandidates);
+ removeCandidates.clear();
+ }
+ }
+
+ if (splitOpsPhase1.size() + splitOpsPhase2.size() > 0) {
+ invalidatePaths(pathsToInvalidate);
+ batchSplit(splitOpsPhase1);
+ batchSplit(splitOpsPhase2);
+ splitCandidates.keySet().removeAll(removeCandidates);
+ }
+ }
+
+ private void invalidatePaths(@NotNull Set<Path> pathsToInvalidate) {
+ if (pathsToInvalidate.isEmpty()) {
+ // nothing to do
+ return;
+ }
+ // create journal entry for cache invalidation
+ JournalEntry entry = JOURNAL.newDocument(getDocumentStore());
+ entry.modified(pathsToInvalidate);
+ Revision r = newRevision().asBranchRevision();
+ UpdateOp journalOp = entry.asUpdateOp(r);
+ if (store.create(JOURNAL, singletonList(journalOp))) {
+ changes.invalidate(singletonList(r));
+ LOG.debug("Journal entry {} created for split of document(s) {}",
+ journalOp.getId(), pathsToInvalidate);
+ } else {
+ String msg = "Unable to create journal entry " +
+ journalOp.getId() + " for document invalidation. " +
+ "Will be retried with next background split " +
+ "operation.";
+ throw new DocumentStoreException(msg);
+ }
+ }
+
+ private void batchSplit(@NotNull List<UpdateOp> splitOps) {
+ if (splitOps.isEmpty()) {
+ // nothing to do
+ return;
+ }
+ // apply the split operations
+ List<NodeDocument> beforeList = store.createOrUpdate(Collection.NODES, splitOps);
+ if (LOG.isDebugEnabled()) {
+ // this is rather expensive - but given we were doing log.debug before
+ // the batchSplit mechanism, so this somewhat negates the batch improvement indeed
+ for (int i = 0; i < splitOps.size(); i++) {
+ UpdateOp op = splitOps.get(i);
+ NodeDocument before = beforeList.size() > i ? beforeList.get(i) : null;
if (before != null) {
- if (LOG.isDebugEnabled()) {
- NodeDocument after = store.find(Collection.NODES, op.getId());
- if (after != null) {
- LOG.debug("Split operation on {}. Size before: {}, after: {}",
- id, before.getMemory(), after.getMemory());
- }
+ NodeDocument after = store.find(Collection.NODES, op.getId());
+ if (after != null) {
+ LOG.debug("Split operation on {}. Size before: {}, after: {}",
+ op.getId(), before.getMemory(), after.getMemory());
}
} else {
LOG.debug("Split operation created {}", op.getId());
}
}
- it.remove();
}
}
diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
index 80d124c..e22ac1d 100644
--- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
+++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
@@ -1170,7 +1170,8 @@
* the document store.
* @param binarySize a function that returns the binary size of the given
* JSON property value String.
- * @return the split operations.
+ * @return the split operations, whereby the last updateOp is guaranteed to be
+ * the update of the main document (unless the entire list is empty)
*/
@NotNull
public Iterable<UpdateOp> split(@NotNull RevisionContext context,
diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
index fb52155..d4e0988 100644
--- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
+++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
@@ -81,6 +81,28 @@
lastLogTime = now();
}
+ public void reset() {
+ startTime = 0;
+ counts.clear();
+ lastLogTime = 0;
+ totalLogTime = 0;
+ slowCalls.clear();
+ }
+
+ public long getOverallTime() {
+ long overallTime = 0;
+ for (Count count : counts.values()) {
+ overallTime += count.total;
+ }
+ return overallTime;
+ }
+
+ public long getAndResetOverallTime() {
+ final long result = getOverallTime();
+ reset();
+ return result;
+ }
+
private boolean logCommonCall() {
return callCount % 10 == 0;
}
diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java
new file mode 100644
index 0000000..a1b5ce3
--- /dev/null
+++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentBatchSplitTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState.binaryProperty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
+import org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
+
+/**
+ * Check correct splitting of documents (OAK-926 & OAK-1342).
+ */
+@RunWith(Parameterized.class)
+public class DocumentBatchSplitTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DocumentBatchSplitTest.class);
+
+ private String createOrUpdateBatchSize;
+ private boolean createOrUpdateBatchSizeIsNull;
+
+ private DocumentStoreFixture fixture;
+ protected DocumentMK mk;
+
+ public DocumentBatchSplitTest(DocumentStoreFixture fixture) {
+ this.fixture = fixture;
+ }
+
+ @Parameterized.Parameters(name="{0}")
+ public static java.util.Collection<Object[]> fixtures() throws IOException {
+ List<Object[]> fixtures = Lists.newArrayList();
+ fixtures.add(new Object[] {new DocumentStoreFixture.MemoryFixture()});
+
+ DocumentStoreFixture mongo = new DocumentStoreFixture.MongoFixture();
+ if(mongo.isAvailable()){
+ fixtures.add(new Object[] {mongo});
+ }
+ return fixtures;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (mk != null) {
+ mk.dispose();
+ mk = null;
+ }
+ fixture.dispose();
+ // reset log level to default
+ enableLevel("org", null);
+ }
+
+ @Before
+ public void backupProperty() {
+ createOrUpdateBatchSize = System.getProperty("oak.documentMK.createOrUpdateBatchSize");
+ if (createOrUpdateBatchSize == null) {
+ createOrUpdateBatchSizeIsNull = true;
+ }
+ }
+
+ @After
+ public void restoreProperty() {
+ if (createOrUpdateBatchSize != null) {
+ System.setProperty("oak.documentMK.createOrUpdateBatchSize", createOrUpdateBatchSize);
+ } else if (createOrUpdateBatchSizeIsNull) {
+ System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
+ }
+ }
+
+ @Test
+ @Ignore(value = "useful for benchmarking only, long execution duration")
+ public void batchSplitBenchmark() throws Exception {
+ int[] batchSizes = new int[] {1,2,10,30,50,75,100,200,300,400,500,1000,2000,5000,10000};
+ for (int batchSize : batchSizes) {
+ batchSplitTest(batchSize, 10000);
+ batchSplitTest(batchSize, 10000);
+ }
+ }
+
+ @Test
+ public void largeBatchSplit() throws Exception {
+ batchSplitTest(200, 1000);
+ }
+
+ @Test
+ public void mediumBatchSplit() throws Exception {
+ batchSplitTest(50, 1000);
+ }
+
+ @Test
+ public void smallBatchSplit() throws Exception {
+ batchSplitTest(2, 1000);
+ }
+
+ @Test
+ public void noBatchSplit() throws Exception {
+ batchSplitTest(1, 1000);
+ }
+
+ /** Make sure we have a test that has log level set to DEBUG */
+ @Test
+ public void debugLogLevelBatchSplit() throws Exception {
+ enableLevel("org", Level.DEBUG);
+ batchSplitTest(50, 1000);
+ }
+
+ private void batchSplitTest(int batchSize, int splitDocCnt) throws Exception {
+ LOG.info("batchSplitTest: batchSize = " + batchSize+ ", splitDocCnt = " + splitDocCnt +
+ ", fixture = " + fixture);
+ // this tests wants to use CountingDocumentStore
+ // plus it wants to set the batchSize
+ if (mk != null) {
+ mk.dispose();
+ mk = null;
+ }
+ if (fixture.getName().equals("MongoDB")) {
+ MongoUtils.dropCollections(MongoUtils.DB);
+ }
+
+ System.setProperty("oak.documentMK.createOrUpdateBatchSize", String.valueOf(batchSize));
+
+ DocumentMK.Builder mkBuilder = new DocumentMK.Builder();
+ DocumentStore delegateStore = fixture.createDocumentStore();
+ TimingDocumentStoreWrapper timingStore = new TimingDocumentStoreWrapper(delegateStore);
+ CountingDocumentStore store = new CountingDocumentStore(timingStore);
+ mkBuilder.setDocumentStore(store);
+ // disable automatic background operations
+ mkBuilder.setAsyncDelay(0);
+ mk = mkBuilder.open();
+ DocumentNodeStore ns = mk.getNodeStore();
+ assertEquals(batchSize, ns.getCreateOrUpdateBatchSize());
+
+ NodeBuilder builder = ns.getRoot().builder();
+ for(int child = 0; child < 100; child++) {
+ builder.child("testchild-" + child);
+ }
+ ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ for(int i=0; i<2; i++) {
+ builder = ns.getRoot().builder();
+ for(int child = 0; child < splitDocCnt; child++) {
+ PropertyState binary = binaryProperty("prop", randomBytes(5 * 1024));
+ builder.child("testchild-" + child).setProperty(binary);
+ }
+ ns.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+ timingStore.reset();
+ store.resetCounters();
+ final long start = System.currentTimeMillis();
+ ns.runBackgroundUpdateOperations();
+ int createOrUpdateCalls = store.getNumCreateOrUpdateCalls(NODES);
+ final long remoteCallMeasurement = timingStore.getAndResetOverallTime();
+ final long totalSplitDuration = (System.currentTimeMillis() - start);
+ final long localSplitPart = totalSplitDuration - remoteCallMeasurement;
+ LOG.info("batchSplitTest: batchSize = " + batchSize +
+ ", splitDocCnt = " + splitDocCnt +
+ ", createOrUpdateCalls = " + createOrUpdateCalls +
+ ", fixture = " + fixture.getName() +
+ ", split total ms = " + (System.currentTimeMillis() - start) +
+ " (thereof local = " + localSplitPart +
+ ", remote = " + remoteCallMeasurement + ")");
+ int expected = 2 * (splitDocCnt / batchSize) /* 2 calls per batch */
+ + 2 * Math.min(1, splitDocCnt % batchSize) /* 1 additional pair for the last batch */
+ + 1; /* 1 more for backgroundWrite's update to root */
+ assertTrue("batchSize = " + batchSize
+ + ", splitDocCnt = " + splitDocCnt
+ + ", expected=" + expected
+ + ", createOrUpdates=" + createOrUpdateCalls,
+ createOrUpdateCalls >= expected && createOrUpdateCalls <= expected + 2);
+ VersionGarbageCollector gc = ns.getVersionGarbageCollector();
+
+ int actualSplitDocGCCount = 0;
+ long timeout = ns.getClock().getTime() + 20000;
+ while(actualSplitDocGCCount < splitDocCnt && ns.getClock().getTime() < timeout) {
+ VersionGCStats stats = gc.gc(1, TimeUnit.MILLISECONDS);
+ actualSplitDocGCCount += stats.splitDocGCCount;
+ if (actualSplitDocGCCount != splitDocCnt) {
+ LOG.info("batchSplitTest: Expected " + splitDocCnt + ", actual " + actualSplitDocGCCount);
+ // advance time a bit to ensure gc does clean up the split docs
+ ns.getClock().waitUntil(ns.getClock().getTime() + 1000);
+ ns.runBackgroundUpdateOperations();
+ }
+ }
+
+ // make sure those splitDocCnt split docs are deleted
+ assertTrue("gc not as expected: expected " + splitDocCnt
+ + ", got " + actualSplitDocGCCount, splitDocCnt <= actualSplitDocGCCount);
+
+ mk.dispose();
+ mk = null;
+ }
+
+ private byte[] randomBytes(int num) {
+ Random random = new Random(42);
+ byte[] data = new byte[num];
+ random.nextBytes(data);
+ return data;
+ }
+
+ // TODO: from DocumentStoreStatsTest
+ // but there are various places such as RevisionsCommand, BroadcastTest that have similar code.
+ // we might want to move this to a new common util/helper
+ private static void enableLevel(String logName, Level level){
+ ((LoggerContext)LoggerFactory.getILoggerFactory())
+ .getLogger(logName).setLevel(level);
+ }
+}