| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.jackrabbit.oak.plugins.document; |
| |
| import java.util.UUID; |
| import java.util.concurrent.Semaphore; |
| |
| import junitx.util.PrivateAccessor; |
| import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; |
| import org.apache.jackrabbit.oak.stats.Clock; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import static org.junit.Assert.assertNotNull; |
| import static org.mockito.AdditionalAnswers.delegatesTo; |
| import static org.mockito.ArgumentMatchers.anyBoolean; |
| import static org.mockito.ArgumentMatchers.anyInt; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.mock; |
| |
| public class DocumentDiscoveryLiteInvisibleServiceCrashTest |
| extends BaseDocumentDiscoveryLiteServiceTest { |
| |
| private static final int TEST_WAIT_TIMEOUT = 10000; |
| |
| private Clock clock; |
| private DocumentStore store; |
| private String wd1; |
| private DocumentNodeStore ns1; |
| private String wd2; |
| private DocumentNodeStore ns2; |
| |
| @Before |
| public void setup() throws Exception { |
| clock = new Clock.Virtual(); |
| clock.waitUntil(System.currentTimeMillis()); |
| ClusterNodeInfo.setClock(clock); |
| store = new MemoryDocumentStore(); |
| wd1 = UUID.randomUUID().toString(); |
| wd2 = UUID.randomUUID().toString(); |
| } |
| |
| @After |
| public void reset() { |
| ns1.dispose(); |
| ns2.dispose(); |
| ClusterNodeInfo.resetClockToDefault(); |
| } |
| |
| @Test |
| public void testTwoNodesWithCrashAndLongduringRecovery() throws Throwable { |
| doTestTwoNodesWithCrashAndLongduringDeactivation(false); |
| } |
| |
| @Test |
| public void testTwoNodesWithCrashAndLongduringRecoveryAndBacklog() throws Throwable { |
| doTestTwoNodesWithCrashAndLongduringDeactivation(true); |
| } |
| |
| private void doTestTwoNodesWithCrashAndLongduringDeactivation(boolean withBacklog) throws Throwable { |
| ns1 = newDocumentNodeStore(store, wd1); |
| SimplifiedInstance s1 = createInstance(ns1, wd1); |
| ViewExpectation e1 = new ViewExpectation(s1); |
| e1.setActiveIds(ns1.getClusterId()); |
| waitFor(e1, TEST_WAIT_TIMEOUT, "first should see itself active"); |
| ns1.runBackgroundOperations(); |
| |
| ns2 = newDocumentNodeStore(store, wd2, true); |
| SimplifiedInstance s2 = createInstance(ns2, wd2); |
| ViewExpectation e2 = new ViewExpectation(s2); |
| e2.setActiveIds(ns1.getClusterId()); |
| waitFor(e2, TEST_WAIT_TIMEOUT, "second should see only first active"); |
| ns2.runBackgroundOperations(); |
| |
| ns1.runBackgroundReadOperations(); |
| // now ns1 should also see both active |
| ViewExpectation e3 = new ViewExpectation(s1); |
| e3.setActiveIds(ns1.getClusterId()); |
| waitFor(e3, TEST_WAIT_TIMEOUT, "first should see only itself as active"); |
| |
| // before crashing s2, make sure that s1's lastRevRecovery thread |
| // doesn't run |
| s1.stopLastRevThread(); |
| if (withBacklog) { |
| // if we want to do backlog testing, then s2 should write |
| // something |
| // before it crashes, so here it comes: |
| s2.addNode("/foo/bar"); |
| s2.setProperty("/foo/bar", "prop", "value"); |
| } |
| |
| // then wait 2 sec |
| clock.waitUntil(clock.getTime() + 2000); |
| |
| s2.crash(); |
| |
| // then wait 2 sec |
| clock.waitUntil(clock.getTime() + 2000); |
| |
| // at this stage, while s2 has crashed, we have stopped s1's |
| // lastRevRecoveryThread, so we should still see both as active |
| logger.info(s1.getClusterViewStr()); |
| final ViewExpectation expectation1AfterCrashBeforeLastRevRecovery = new ViewExpectation(s1); |
| expectation1AfterCrashBeforeLastRevRecovery.setActiveIds(ns1.getClusterId()); |
| waitFor(expectation1AfterCrashBeforeLastRevRecovery, TEST_WAIT_TIMEOUT, "first should still see only itself as active"); |
| |
| // the next part is a bit tricky: we want to fine-control the |
| // lastRevRecoveryThread's acquire/release locking. |
| // the chosen way to do this is to make heavy use of mockito and two |
| // semaphores: |
| // when acquireRecoveryLock is called, that thread should wait for the |
| // waitBeforeLocking semaphore to be released |
| final MissingLastRevSeeker missingLastRevUtil = (MissingLastRevSeeker) PrivateAccessor |
| .getField(s1.ns.getLastRevRecoveryAgent(), "missingLastRevUtil"); |
| assertNotNull(missingLastRevUtil); |
| MissingLastRevSeeker mockedLongduringMissingLastRevUtil = mock(MissingLastRevSeeker.class, delegatesTo(missingLastRevUtil)); |
| final Semaphore waitBeforeLocking = new Semaphore(0); |
| doAnswer(new Answer<Boolean>() { |
| @Override |
| public Boolean answer(InvocationOnMock invocation) throws Throwable { |
| logger.info("going to waitBeforeLocking"); |
| waitBeforeLocking.acquire(); |
| logger.info("done with waitBeforeLocking"); |
| return missingLastRevUtil.acquireRecoveryLock((Integer) invocation.getArguments()[0], |
| (Integer) invocation.getArguments()[1]); |
| } |
| }).when(mockedLongduringMissingLastRevUtil).acquireRecoveryLock(anyInt(), anyInt()); |
| PrivateAccessor.setField(s1.ns.getLastRevRecoveryAgent(), "missingLastRevUtil", mockedLongduringMissingLastRevUtil); |
| |
| // so let's start the lastRevThread again and wait for that |
| // waitBeforeLocking semaphore to be hit |
| s1.startLastRevThread(); |
| waitFor(new Expectation() { |
| |
| @Override |
| public String fulfilled() throws Exception { |
| if (!waitBeforeLocking.hasQueuedThreads()) { |
| return "no thread queued"; |
| } |
| return null; |
| } |
| |
| }, TEST_WAIT_TIMEOUT, "lastRevRecoveryThread should acquire a lock"); |
| |
| // at this stage the crashed s2 is still not in recovery mode, so let's |
| // check: |
| logger.info(s1.getClusterViewStr()); |
| final ViewExpectation expectation1AfterCrashBeforeLastRevRecoveryLocking = new ViewExpectation(s1); |
| expectation1AfterCrashBeforeLastRevRecoveryLocking.setActiveIds(ns1.getClusterId()); |
| waitFor(expectation1AfterCrashBeforeLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see itself as active"); |
| |
| // one thing, before we let the waitBeforeLocking go, setup the release |
| // semaphore/mock: |
| final Semaphore waitBeforeUnlocking = new Semaphore(0); |
| Mockito.doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) throws InterruptedException { |
| logger.info("Going to waitBeforeUnlocking"); |
| waitBeforeUnlocking.acquire(); |
| logger.info("Done with waitBeforeUnlocking"); |
| missingLastRevUtil.releaseRecoveryLock( |
| (Integer) invocation.getArguments()[0], |
| (Boolean) invocation.getArguments()[1]); |
| return null; |
| } |
| }).when(mockedLongduringMissingLastRevUtil).releaseRecoveryLock(anyInt(), anyBoolean()); |
| |
| // let go (or tschaedere loh) |
| waitBeforeLocking.release(); |
| |
| // then, right after we let the waitBeforeLocking semaphore go, we |
| // should see s2 in recovery mode |
| final ViewExpectation expectation1AfterCrashWhileLastRevRecoveryLocking = new ViewExpectation(s1); |
| expectation1AfterCrashWhileLastRevRecoveryLocking.setActiveIds(ns1.getClusterId()); |
| waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see itself as active"); |
| |
| // ok, meanwhile, the lastRevRecoveryAgent should have hit the ot |
| waitFor(new Expectation() { |
| |
| @Override |
| public String fulfilled() throws Exception { |
| if (!waitBeforeUnlocking.hasQueuedThreads()) { |
| return "no thread queued"; |
| } |
| return null; |
| } |
| |
| }, TEST_WAIT_TIMEOUT, "lastRevRecoveryThread should want to release a lock"); |
| |
| // so then, we should still see the same state |
| waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see itself as active"); |
| |
| logger.info("Waiting 2 sec"); |
| clock.waitUntil(clock.getTime() + 2000); |
| logger.info("Waiting done"); |
| |
| // first, lets check to see what the view looks like - should be |
| // unchanged: |
| waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see itself as active"); |
| |
| // let waitBeforeUnlocking go |
| logger.info("releasing waitBeforeUnlocking, state: " + s1.getClusterViewStr()); |
| waitBeforeUnlocking.release(); |
| logger.info("released waitBeforeUnlocking"); |
| |
| if (!withBacklog) { |
| final ViewExpectation expectationWithoutBacklog = new ViewExpectation(s1); |
| expectationWithoutBacklog.setActiveIds(ns1.getClusterId()); |
| waitFor(expectationWithoutBacklog, TEST_WAIT_TIMEOUT, "only first as active"); |
| waitFor(() -> { |
| if (!getLatestClusterInfo(ns2.getClusterId(), ns2).isActive() && |
| !getLatestClusterInfo(ns2.getClusterId(), ns2).isBeingRecovered()) { |
| return null; |
| } else { |
| return "Still not inactive"; |
| } |
| }, TEST_WAIT_TIMEOUT, "Second cluster should be inactive"); |
| } else { |
| // wait just 2 sec to see if the bgReadThread is really stopped |
| logger.info("sleeping 2 sec"); |
| clock.waitUntil(clock.getTime() + 2000); |
| logger.info("sleeping 2 sec done, state: " + s1.getClusterViewStr()); |
| |
| // when that's the case, check the view - it should now be in a |
| // special 'final=false' mode |
| final ViewExpectation expectationBeforeBgRead = new ViewExpectation(s1); |
| expectationBeforeBgRead.setActiveIds(ns1.getClusterId()); |
| waitFor(() -> { |
| ClusterNodeInfoDocument latestClusterInfo = getLatestClusterInfo(ns2.getClusterId(), ns2); |
| boolean hasBacklog = s1.service.hasBacklog(latestClusterInfo); |
| if (hasBacklog) { |
| return null; |
| } else { |
| return "No Backlog"; |
| } |
| }, TEST_WAIT_TIMEOUT, "Second cluster should have backlogs"); |
| waitFor(expectationBeforeBgRead, TEST_WAIT_TIMEOUT, "first should only see itself after shutdown"); |
| |
| // ook, now we explicitly do a background read to get out of the |
| // backlog situation |
| ns1.runBackgroundReadOperations(); |
| |
| final ViewExpectation expectationAfterBgRead = new ViewExpectation(s1); |
| expectationAfterBgRead.setActiveIds(ns1.getClusterId()); |
| waitFor(expectationAfterBgRead, TEST_WAIT_TIMEOUT, "we should see s1 as only active"); |
| waitFor(() -> { |
| ClusterNodeInfoDocument latestClusterInfo = getLatestClusterInfo(ns2.getClusterId(), ns2); |
| boolean hasBacklog = s1.service.hasBacklog(latestClusterInfo); |
| if (!hasBacklog) { |
| return null; |
| } else { |
| return "Still has Backlog"; |
| } |
| }, TEST_WAIT_TIMEOUT, "Second cluster should not have backlog any longer"); |
| } |
| } |
| |
| private static ClusterViewDocument read(DocumentNodeStore documentNodeStore) { |
| DocumentStore documentStore = documentNodeStore.getDocumentStore(); |
| Document doc = documentStore.find(Collection.SETTINGS, "clusterView", |
| -1 /* -1; avoid caching */); |
| if (doc == null) { |
| return null; |
| } else { |
| ClusterViewDocument clusterView = new ClusterViewDocument(doc); |
| if (clusterView.isValid()) { |
| return clusterView; |
| } else { |
| return null; |
| } |
| } |
| } |
| |
| private ClusterNodeInfoDocument getLatestClusterInfo(int id, DocumentNodeStore nodeStore) { |
| for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(nodeStore.getDocumentStore())) { |
| int cId = doc.getClusterId(); |
| if (cId == id) { |
| return doc; |
| } |
| } |
| return null; |
| } |
| |
| private DocumentNodeStore newDocumentNodeStore(DocumentStore store, |
| String workingDir) { |
| return newDocumentNodeStore(store, workingDir, false); |
| } |
| |
| private DocumentNodeStore newDocumentNodeStore(DocumentStore store, |
| String workingDir, boolean invisible) { |
| String prevWorkingDir = ClusterNodeInfo.WORKING_DIR; |
| try { |
| // ensure that we always get a fresh cluster[node]id |
| ClusterNodeInfo.WORKING_DIR = workingDir; |
| |
| return new DocumentMK.Builder() |
| .clock(clock) |
| .setAsyncDelay(0) |
| .setDocumentStore(store) |
| .setLeaseCheckMode(LeaseCheckMode.DISABLED) |
| .setClusterInvisible(invisible) |
| .getNodeStore(); |
| } finally { |
| ClusterNodeInfo.WORKING_DIR = prevWorkingDir; |
| } |
| } |
| |
| } |