blob: d2b63bb72c599b5137e719b311312f23d9f8a797 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.apache.jackrabbit.oak.plugins.document.TestUtils.isFinalCommitRootUpdate;
import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class RollbackTest {
@Rule
public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
private ExecutorService executorService = Executors.newCachedThreadPool();
private Clock clock = new Clock.Virtual();
@Before
public void before() throws Exception {
clock.waitUntil(System.currentTimeMillis());
Revision.setClock(clock);
}
@After
public void after() throws Exception {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
@AfterClass
public static void resetClock() {
Revision.resetClockToDefault();
}
@Test
public void nonBlocking() throws Exception {
TestStore store = new TestStore();
DocumentNodeStore ns = builderProvider.newBuilder()
.clock(clock).setDocumentStore(store).setAsyncDelay(0).build();
ns.setMaxBackOffMillis(0); // do not retry commits
// create initial node structure
NodeBuilder builder = ns.getRoot().builder();
builder.child("foo");
builder.child("bar");
builder.child("baz");
merge(ns, builder);
store.failCommitOnce.set(true);
// add a child to each of the nodes.
// the commit root will be on the root document
Future<CommitFailedException> f = executorService.submit(() -> {
NodeBuilder nb = ns.getRoot().builder();
nb.child("foo").child("n");
nb.child("bar").child("n");
nb.child("baz").child("n");
try {
merge(ns, nb);
fail("must throw CommitFailedException");
return null;
} catch (CommitFailedException e) {
// expected
return e;
}
});
store.commitFailed.await();
builder = ns.getRoot().builder();
builder.child("test");
merge(ns, builder);
long t1 = clock.getTime();
assertNotNull(f.get());
long t2 = clock.getTime();
assertFalse(store.failCommitOnce.get());
assertThat(t2 - t1,
greaterThanOrEqualTo(1000L));
}
@Test(expected = DocumentStoreException.class)
public void rollbackFailed() {
Rollback.FAILED.perform(new MemoryDocumentStore());
}
@Test
public void rollbackNone() {
Rollback.NONE.perform(new MemoryDocumentStore());
}
@Test
public void batchSize() {
AtomicInteger maxBatchSize = new AtomicInteger(0);
DocumentStore store = new DocumentStoreWrapper(new MemoryDocumentStore()) {
@Override
public <T extends Document> List<T> createOrUpdate(Collection<T> collection,
List<UpdateOp> updateOps) {
maxBatchSize.set(Math.max(maxBatchSize.get(), updateOps.size()));
return super.createOrUpdate(collection, updateOps);
}
};
int batchSize = 100;
List<UpdateOp> updates = new ArrayList<>();
for (int i = 0; i < batchSize * 2; i++) {
updates.add(new UpdateOp("id-" + i, false));
}
new Rollback(Revision.newRevision(1),
updates, "id", batchSize).perform(store);
assertThat(maxBatchSize.get(), greaterThan(0));
assertThat(maxBatchSize.get(), lessThanOrEqualTo(batchSize));
}
private class TestStore extends MemoryDocumentStore {
final AtomicBoolean failCommitOnce = new AtomicBoolean();
final CountDownLatch commitFailed = new CountDownLatch(1);
@Override
public <T extends Document> T findAndUpdate(Collection<T> collection,
UpdateOp update) {
if (collection == Collection.NODES
&& isFinalCommitRootUpdate(update)
&& failCommitOnce.compareAndSet(true, false)) {
commitFailed.countDown();
throw new DocumentStoreException("commit failed");
}
return super.findAndUpdate(collection, update);
}
@Override
public <T extends Document> List<T> createOrUpdate(Collection<T> collection,
List<UpdateOp> updateOps) {
for (UpdateOp op : updateOps) {
if (isRollbackUpdate(op)) {
try {
// simulate a rollback operation that takes one second
Thread.sleep(10);
clock.waitUntil(clock.getTime() + 1000);
break;
} catch (InterruptedException e) {
throw new DocumentStoreException(e);
}
}
}
return super.createOrUpdate(collection, updateOps);
}
private boolean isRollbackUpdate(UpdateOp update) {
for (Map.Entry<Key, Operation> entry : update.getChanges().entrySet()) {
if (entry.getValue().type == Operation.Type.REMOVE_MAP_ENTRY
&& !entry.getKey().getName().equals(NodeDocument.COMMIT_ROOT)) {
return true;
}
}
return false;
}
}
}