blob: 6f43635d5c1285877cbf3da81850cd416b43a1f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static com.google.common.collect.Lists.newArrayList;
import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class LastRevRecoveryTest {
@Rule
public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
private Clock clock;
private DocumentNodeStore ds1;
private DocumentNodeStore ds2;
private int c1Id;
private int c2Id;
private MemoryDocumentStore sharedStore;
@Before
public void setUp() throws Exception {
clock = new Clock.Virtual();
clock.waitUntil(System.currentTimeMillis());
Revision.setClock(clock);
ClusterNodeInfo.setClock(clock);
// disable lease check because we fiddle with the virtual clock
final LeaseCheckMode leaseCheck = LeaseCheckMode.DISABLED;
sharedStore = new MemoryDocumentStore();
ds1 = builderProvider.newBuilder()
.clock(clock)
.setLeaseCheckMode(leaseCheck)
.setAsyncDelay(0)
.setDocumentStore(sharedStore)
.setClusterId(1)
.getNodeStore();
c1Id = ds1.getClusterId();
ds2 = builderProvider.newBuilder()
.clock(clock)
.setLeaseCheckMode(leaseCheck)
.setAsyncDelay(0)
.setDocumentStore(sharedStore)
.setClusterId(2)
.getNodeStore();
c2Id = ds2.getClusterId();
}
@After
public void tearDown() {
ds1.dispose();
ds2.dispose();
ClusterNodeInfo.resetClockToDefault();
Revision.resetClockToDefault();
}
@Test
public void testRecover() throws Exception {
//1. Create base structure /x/y
NodeBuilder b1 = ds1.getRoot().builder();
b1.child("x").child("y");
ds1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
ds1.runBackgroundOperations();
//lastRev are persisted directly for new nodes. In case of
// updates they are persisted via background jobs
//1.2 Get last rev populated for root node for ds2
ds2.runBackgroundOperations();
NodeBuilder b2 = ds2.getRoot().builder();
b2.child("x").setProperty("f1","b1");
ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
ds2.runBackgroundOperations();
//2. Add a new node /x/y/z
b2 = ds2.getRoot().builder();
b2.child("x").child("y").child("z").setProperty("foo", "bar");
ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
//Refresh DS1
ds1.runBackgroundOperations();
NodeDocument z1 = getDocument(ds1, "/x/y/z");
NodeDocument y1 = getDocument(ds1, "/x/y");
NodeDocument x1 = getDocument(ds1, "/x");
Revision zlastRev2 = z1.getLastRev().get(c2Id);
// /x/y/z is a new node and does not have a _lastRev
assertNull(zlastRev2);
Revision head2 = ds2.getHeadRevision().getRevision(c2Id);
//lastRev should not be updated for C #2
assertNull(y1.getLastRev().get(c2Id));
LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(sharedStore, ds1);
//Do not pass y1 but still y1 should be updated
recovery.recover(Lists.newArrayList(x1, z1), c2Id);
//Post recovery the lastRev should be updated for /x/y and /x
assertEquals(head2, getDocument(ds1, "/x/y").getLastRev().get(c2Id));
assertEquals(head2, getDocument(ds1, "/x").getLastRev().get(c2Id));
assertEquals(head2, getDocument(ds1, "/").getLastRev().get(c2Id));
}
// OAK-3079
@Test
public void recoveryWithoutRootUpdate() throws Exception {
String clusterId = String.valueOf(c1Id);
ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES, clusterId);
NodeBuilder builder = ds1.getRoot().builder();
builder.child("x").child("y").child("z");
merge(ds1, builder);
ds1.dispose();
// reset clusterNodes entry to simulate a crash
sharedStore.remove(CLUSTER_NODES, clusterId);
sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(doc)));
// 'wait' until lease expires
clock.waitUntil(doc.getLeaseEndTime() + 1);
// run recovery on ds2
LastRevRecoveryAgent agent = new LastRevRecoveryAgent(sharedStore, ds2);
Iterable<Integer> clusterIds = agent.getRecoveryCandidateNodes();
assertTrue(Iterables.contains(clusterIds, c1Id));
assertEquals("must not recover any documents",
0, agent.recover(c1Id));
}
// OAK-3488
@Test
public void recoveryWithTimeout() throws Exception {
String clusterId = String.valueOf(c1Id);
ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES, clusterId);
NodeBuilder builder = ds1.getRoot().builder();
builder.child("x").child("y").child("z");
merge(ds1, builder);
ds1.dispose();
// reset clusterNodes entry to simulate a crash
sharedStore.remove(CLUSTER_NODES, clusterId);
sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(doc)));
// 'wait' until lease expires
clock.waitUntil(doc.getLeaseEndTime() + 1);
// simulate ongoing recovery by cluster node 2
MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore, clock);
seeker.acquireRecoveryLock(c1Id, c2Id);
// run recovery from ds1
LastRevRecoveryAgent a1 = new LastRevRecoveryAgent(sharedStore, ds1);
// use current time -> do not wait for recovery of other agent
assertEquals(-1, a1.recover(c1Id, clock.getTime()));
seeker.releaseRecoveryLock(c1Id, true);
assertEquals(0, a1.recover(c1Id, clock.getTime() + 1000));
}
// OAK-3488
@Test
public void failStartupOnRecoveryTimeout() throws Exception {
String clusterId = String.valueOf(c1Id);
ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES, clusterId);
assertNotNull(doc);
NodeBuilder builder = ds1.getRoot().builder();
builder.child("x").child("y").child("z");
merge(ds1, builder);
ds1.dispose();
// reset clusterNodes entry to simulate a crash
sharedStore.remove(CLUSTER_NODES, clusterId);
sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(doc)));
// 'wait' until lease expires
clock.waitUntil(doc.getLeaseEndTime() + 1);
// make sure ds2 lease is still fine
assertTrue(ds2.getClusterInfo().renewLease());
// simulate ongoing recovery by cluster node 2
MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore, clock);
assertTrue(seeker.acquireRecoveryLock(c1Id, c2Id));
// attempt to restart ds1 while lock is acquired
try {
ds1 = new DocumentMK.Builder()
.clock(clock)
.setDocumentStore(sharedStore)
.setClusterId(c1Id)
.getNodeStore();
fail("DocumentStoreException expected");
} catch (DocumentStoreException e) {
// expected
assertThat(e.getMessage(), containsString("needs recovery"));
}
seeker.releaseRecoveryLock(c1Id, true);
}
// OAK-3488
@Test
public void breakRecoveryLockWithExpiredLease() throws Exception {
String clusterId = String.valueOf(c1Id);
ClusterNodeInfoDocument info1 = sharedStore.find(CLUSTER_NODES, clusterId);
assertNotNull(info1);
NodeBuilder builder = ds1.getRoot().builder();
builder.child("x").child("y").child("z");
merge(ds1, builder);
ds1.dispose();
// reset clusterNodes entry to simulate a crash of ds1
sharedStore.remove(CLUSTER_NODES, clusterId);
sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(info1)));
// 'wait' until lease expires
clock.waitUntil(info1.getLeaseEndTime() + 1);
// make sure ds2 lease is still fine
ds2.getClusterInfo().renewLease();
// start of recovery by ds2
MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore, clock);
assertTrue(seeker.acquireRecoveryLock(c1Id, c2Id));
// simulate crash of ds2
ClusterNodeInfoDocument info2 = sharedStore.find(CLUSTER_NODES, String.valueOf(c2Id));
assertNotNull(info2);
ds2.dispose();
// reset clusterNodes entry
sharedStore.remove(CLUSTER_NODES, String.valueOf(c2Id));
sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(info2)));
// 'wait' until ds2's lease expires
clock.waitUntil(info2.getLeaseEndTime() + 1);
info1 = sharedStore.find(CLUSTER_NODES, clusterId);
assertNotNull(info1);
assertTrue(info1.isRecoveryNeeded(clock.getTime()));
assertTrue(info1.isBeingRecovered());
// restart ds1
ds1 = builderProvider.newBuilder()
.clock(clock)
.setLeaseCheckMode(LeaseCheckMode.DISABLED)
.setAsyncDelay(0)
.setDocumentStore(sharedStore)
.setClusterId(1)
.getNodeStore();
info1 = sharedStore.find(CLUSTER_NODES, clusterId);
assertNotNull(info1);
assertFalse(info1.isRecoveryNeeded(clock.getTime()));
assertFalse(info1.isBeingRecovered());
}
@Test
public void recoveryMustNotPerformInitialSweep() throws Exception {
String clusterId = String.valueOf(c1Id);
ClusterNodeInfoDocument info1 = sharedStore.find(CLUSTER_NODES, clusterId);
NodeBuilder builder = ds1.getRoot().builder();
builder.child("x").child("y").child("z");
merge(ds1, builder);
ds1.dispose();
// reset clusterNodes entry to simulate a crash of ds1
sharedStore.remove(CLUSTER_NODES, clusterId);
sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(info1)));
// remove the sweep revision as well
UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false);
op.removeMapEntry("_sweepRev", new Revision(0, 0, c1Id));
assertNotNull(sharedStore.findAndUpdate(NODES, op));
NodeDocument doc = getRootDocument(sharedStore);
assertNull(doc.getSweepRevisions().getRevision(c1Id));
// 'wait' until lease expires
clock.waitUntil(info1.getLeaseEndTime() + 1);
// make sure ds2 lease is still fine
ds2.getClusterInfo().renewLease();
// run recovery on ds2 for ds1
LastRevRecoveryAgent agent = new LastRevRecoveryAgent(sharedStore, ds2);
Iterable<Integer> clusterIds = agent.getRecoveryCandidateNodes();
assertTrue(Iterables.contains(clusterIds, c1Id));
// nothing to recover
assertEquals("must not recover any documents",
0, agent.recover(c1Id));
// must not set sweep revision
doc = getRootDocument(sharedStore);
assertNull(doc.getSweepRevisions().getRevision(c1Id));
}
@Test
public void selfRecoveryPassedDeadline() throws Exception {
String clusterId = String.valueOf(c1Id);
ClusterNodeInfoDocument info1 = sharedStore.find(CLUSTER_NODES, clusterId);
assertNotNull(info1);
NodeBuilder builder = ds1.getRoot().builder();
builder.child("x").child("y").child("z");
merge(ds1, builder);
ds1.dispose();
// reset clusterNodes entry to simulate a crash of ds1
sharedStore.remove(CLUSTER_NODES, clusterId);
sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(info1)));
// 'wait' until lease expires
clock.waitUntil(info1.getLeaseEndTime() + 1);
AtomicBoolean delay = new AtomicBoolean(true);
// simulate a startup with self-recovery by acquiring the clusterId
// this will call the recovery handler because the lease is expired
// use a seeker that takes longer than the lease duration
MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore, clock) {
@Override
public boolean acquireRecoveryLock(int clusterId, int recoveredBy) {
assertTrue(super.acquireRecoveryLock(clusterId, recoveredBy));
if (delay.get()) {
try {
clock.waitUntil(clock.getTime() + ClusterNodeInfo.DEFAULT_LEASE_DURATION_MILLIS + 1);
} catch (InterruptedException e) {
fail();
}
}
return true;
}
};
RecoveryHandler recoveryHandler = new RecoveryHandlerImpl(
sharedStore, clock, seeker);
try {
// Explicitly acquiring the clusterId must fail
// when it takes too long to recover
ClusterNodeInfo.getInstance(sharedStore, recoveryHandler,
null, null, c1Id);
fail("must fail with DocumentStoreException");
} catch (DocumentStoreException e) {
assertThat(e.getMessage(), containsString("needs recovery"));
}
// But must succeed with auto-assignment of clusterId
// even if machineId and instanceId match
ClusterNodeInfo cni = ClusterNodeInfo.getInstance(sharedStore,
recoveryHandler,null, null, 0);
// though clusterId must not be the one that took too long to recover
assertNotEquals(c1Id, cni.getId());
// hence recovery is still needed for c1Id
assertTrue(seeker.isRecoveryNeeded());
cni.dispose();
// now run again without delay with the explicit clusterId
delay.set(false);
// must succeed now
cni = ClusterNodeInfo.getInstance(sharedStore, recoveryHandler,
null, null, c1Id);
assertEquals(c1Id, cni.getId());
cni.dispose();
}
private NodeDocument getDocument(DocumentNodeStore nodeStore, String path) {
return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path));
}
private static void merge(NodeStore store, NodeBuilder builder)
throws CommitFailedException {
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
private static UpdateOp updateOpFromDocument(Document doc) {
UpdateOp op = new UpdateOp(doc.getId(), true);
for (String key : doc.keySet()) {
if (key.equals(Document.ID)) {
continue;
}
Object obj = doc.get(key);
if (obj instanceof Map) {
@SuppressWarnings("unchecked")
Map<Revision, String> map = (Map<Revision, String>) obj;
for (Map.Entry<Revision, String> entry : map.entrySet()) {
op.setMapEntry(key, entry.getKey(), entry.getValue());
}
} else {
if (obj instanceof Boolean) {
op.set(key, (Boolean) obj);
} else if (obj instanceof Number) {
op.set(key, ((Number) obj).longValue());
} else if (obj != null) {
op.set(key, obj.toString());
} else {
op.set(key, null);
}
}
}
return op;
}
}