blob: 96d15f5c4d2b7c781cdb989daab44d79ad41c10f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import static org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.LEASE_END_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jcr.PropertyType;
import javax.jcr.Value;
import javax.jcr.ValueFormatException;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.commons.json.JsonObject;
import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
import org.apache.jackrabbit.oak.commons.junit.LogDumper;
import org.apache.jackrabbit.oak.commons.junit.LogLevelModifier;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.mockito.stubbing.Answer;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.jackrabbit.guava.common.collect.Lists;
import com.mongodb.client.MongoDatabase;
import junitx.util.PrivateAccessor;
/**
* Abstract base class for the DocumentDiscoveryLiteService tests
*/
public abstract class BaseDocumentDiscoveryLiteServiceTest {
@Rule
public MongoConnectionFactory connectionFactory = new MongoConnectionFactory();
/**
* container for what should represent an instance, but is not a complete
* one, hence 'simplified'. it contains most importantly the
* DocuemntNodeStore and the discoveryLite service
*/
class SimplifiedInstance {
DocumentDiscoveryLiteService service;
DocumentNodeStore ns;
private final Descriptors descriptors;
private Map<Object, Object> registeredServices;
private final long lastRevInterval;
private volatile boolean lastRevStopped = false;
private volatile boolean writeSimulationStopped = false;
private Thread lastRevThread;
private Thread writeSimulationThread;
public String workingDir;
SimplifiedInstance(DocumentDiscoveryLiteService service, DocumentNodeStore ns, Descriptors descriptors,
Map<Object, Object> registeredServices, long lastRevInterval, String workingDir) {
this.service = service;
this.ns = ns;
this.workingDir = workingDir;
this.descriptors = descriptors;
this.registeredServices = registeredServices;
this.lastRevInterval = lastRevInterval;
if (lastRevInterval > 0) {
startLastRevThread();
}
}
@Override
public String toString() {
return "SimplifiedInstance[cid=" + ns.getClusterId() + "]";
}
void startLastRevThread() {
lastRevStopped = false;
lastRevThread = new Thread(new Runnable() {
@Override
public void run() {
while (!lastRevStopped) {
SimplifiedInstance.this.ns.getLastRevRecoveryAgent().performRecoveryIfNeeded();
try {
Thread.sleep(SimplifiedInstance.this.lastRevInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
lastRevThread.setDaemon(true);
lastRevThread.setName("lastRevThread[cid=" + ns.getClusterId() + "]");
lastRevThread.start();
}
void stopLastRevThread() throws InterruptedException {
lastRevStopped = true;
lastRevThread.join();
}
boolean isFinal() throws Exception {
final JsonObject clusterViewObj = getClusterViewObj();
if (clusterViewObj == null) {
throw new IllegalStateException("should always have that final flag set");
}
String finalStr = clusterViewObj.getProperties().get("final");
return Boolean.valueOf(finalStr);
}
boolean isInvisible() {
return ns.getClusterInfo().isInvisible();
}
boolean hasActiveIds(String clusterViewStr, int... expected) throws Exception {
return hasIds(clusterViewStr, "active", expected);
}
boolean hasDeactivatingIds(String clusterViewStr, int... expected) throws Exception {
return hasIds(clusterViewStr, "deactivating", expected);
}
boolean hasInactiveIds(String clusterViewStr, int... expected) throws Exception {
return hasIds(clusterViewStr, "inactive", expected);
}
private boolean hasIds(final String clusterViewStr, final String key, int... expectedIds) throws Exception {
final JsonObject clusterViewObj = asJsonObject(clusterViewStr);
String actualIdsStr = clusterViewObj == null ? null : clusterViewObj.getProperties().get(key);
boolean actualEmpty = actualIdsStr == null || actualIdsStr.length() == 0 || actualIdsStr.equals("[]");
boolean expectedEmpty = expectedIds == null || expectedIds.length == 0;
if (actualEmpty && expectedEmpty) {
return true;
}
if (actualEmpty != expectedEmpty) {
return false;
}
final List<Integer> actualList = Arrays
.asList(ClusterViewDocument.csvToIntegerArray(actualIdsStr.substring(1, actualIdsStr.length() - 1)));
if (expectedIds.length != actualList.size()) {
return false;
}
for (int i = 0; i < expectedIds.length; i++) {
int anExpectedId = expectedIds[i];
if (!actualList.contains(anExpectedId)) {
return false;
}
}
return true;
}
JsonObject getClusterViewObj() throws Exception {
final String json = getClusterViewStr();
return asJsonObject(json);
}
private JsonObject asJsonObject(final String json) {
if (json == null) {
return null;
}
JsopTokenizer t = new JsopTokenizer(json);
t.read('{');
JsonObject o = JsonObject.create(t);
return o;
}
String getClusterViewStr() throws Exception {
return getDescriptor(DocumentDiscoveryLiteService.OAK_DISCOVERYLITE_CLUSTERVIEW);
}
String getDescriptor(String key) throws Exception {
final Value value = descriptors.getValue(key);
if (value == null) {
return null;
}
if (value.getType() != PropertyType.STRING) {
return null;
}
try {
return value.getString();
} catch (ValueFormatException vfe) {
return null;
}
}
public void dispose() {
logger.info("Disposing " + this);
try {
stopSimulatingWrites();
} catch (InterruptedException e) {
fail("interrupted");
}
if (lastRevThread != null) {
try {
stopLastRevThread();
} catch (InterruptedException ok) {
fail("interrupted");
}
lastRevThread = null;
}
if (service != null) {
service.deactivate();
service = null;
}
if (ns != null) {
ns.dispose();
ns = null;
}
if (registeredServices != null) {
registeredServices.clear();
registeredServices = null;
}
}
/**
* shutdown simulates the normal, graceful, shutdown
*
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException {
stopSimulatingWrites();
stopLastRevThread();
ns.dispose();
service.deactivate();
}
/**
* crash simulates a kill -9, sort of
*
* @throws Throwable
*/
public void crash() throws Throwable {
logger.info("crash: stopping simulating writes...");
stopSimulatingWrites();
logger.info("crash: stopping lastrev thread...");
stopLastRevThread();
logger.info("crash: stopped lastrev thread, now setting lease to end within 1 sec");
boolean renewed = setLeaseTime(1000 /* 1 sec */, 10 /*10ms*/);
if (!renewed) {
logger.info("halt");
fail("did not renew clusterid lease");
}
logger.info("crash: now stopping background read/update");
stopAllBackgroundThreads();
// but don't do the following from DocumentNodeStore.dispose():
// * don't do the last internalRunBackgroundUpdateOperations - as
// we're trying to simulate a crash here
// * don't dispose clusterNodeInfo to leave the node in active state
// the DocumentDiscoveryLiteService currently can simply be
// deactivated, doesn't differ much from crashing
service.deactivate();
logger.info("crash: crash simulation done.");
}
/**
* very hacky way of doing the following: make sure this instance's
* clusterNodes entry is marked with a very short (1 sec off) lease end
* time so that the crash detection doesn't take a minute (as it would
* by default)
*/
private boolean setLeaseTime(final int leaseTime, final int leaseUpdateInterval) throws NoSuchFieldException {
ns.getClusterInfo().setLeaseTime(leaseTime);
ns.getClusterInfo().setLeaseUpdateInterval(leaseUpdateInterval);
long newLeaseTime = System.currentTimeMillis() + (leaseTime / 3) - 10 /* 10ms safety margin */;
PrivateAccessor.setField(ns.getClusterInfo(), "leaseEndTime", newLeaseTime);
// OAK-9564: Apply the update low level to the nodeStore, as the max operation wouldn't let to apply the
// new lease time if it is lower than the current one.
UpdateOp op = new UpdateOp(String.valueOf(ns.getClusterId()), false);
op.set(LEASE_END_KEY, newLeaseTime);
ns.getDocumentStore().findAndUpdate(Collection.CLUSTER_NODES, op);
boolean renewed = ns.renewClusterIdLease();
return renewed;
}
private AtomicBoolean getIsDisposed() throws NoSuchFieldException {
AtomicBoolean isDisposed = (AtomicBoolean) PrivateAccessor.getField(ns, "isDisposed");
return isDisposed;
}
private AtomicBoolean getStopLeaseUpdateThread() throws NoSuchFieldException {
AtomicBoolean stopLeaseUpdateThread = (AtomicBoolean) PrivateAccessor.getField(ns, "stopLeaseUpdateThread");
return stopLeaseUpdateThread;
}
private void stopAllBackgroundThreads() throws NoSuchFieldException {
// get all those background threads...
Thread backgroundReadThread = (Thread) PrivateAccessor.getField(ns, "backgroundReadThread");
assertNotNull(backgroundReadThread);
Thread backgroundUpdateThread = (Thread) PrivateAccessor.getField(ns, "backgroundUpdateThread");
assertNotNull(backgroundUpdateThread);
Thread leaseUpdateThread = (Thread) PrivateAccessor.getField(ns, "leaseUpdateThread");
assertNotNull(leaseUpdateThread);
// start doing what DocumentNodeStore.dispose() would do - except do
// it very fine controlled, basically:
// make sure to stop backgroundReadThread, backgroundUpdateThread
// and leaseUpdateThread
// but then nothing else.
final AtomicBoolean isDisposed = getIsDisposed();
assertFalse(isDisposed.getAndSet(true));
// notify background threads waiting on isDisposed
synchronized (isDisposed) {
isDisposed.notifyAll();
}
try {
backgroundReadThread.join(5000);
assertTrue(!backgroundReadThread.isAlive());
} catch (InterruptedException e) {
// ignore
}
try {
backgroundUpdateThread.join(5000);
assertTrue(!backgroundUpdateThread.isAlive());
} catch (InterruptedException e) {
// ignore
}
final AtomicBoolean stopLeaseUpdateThread = getStopLeaseUpdateThread();
assertFalse(stopLeaseUpdateThread.getAndSet(true));
// notify background threads waiting on stopLeaseUpdateThread
synchronized (stopLeaseUpdateThread) {
stopLeaseUpdateThread.notifyAll();
}
try {
leaseUpdateThread.join(5000);
assertTrue(!leaseUpdateThread.isAlive());
} catch (InterruptedException e) {
// ignore
}
}
public void stopBgReadThread() throws NoSuchFieldException {
final Thread backgroundReadThread = (Thread) PrivateAccessor.getField(ns, "backgroundReadThread");
assertNotNull(backgroundReadThread);
final Runnable bgReadRunnable = (Runnable) PrivateAccessor.getField(backgroundReadThread, "target");
assertNotNull(bgReadRunnable);
final AtomicBoolean bgReadIsDisposed = new AtomicBoolean(false);
PrivateAccessor.setField(bgReadRunnable, "isDisposed", bgReadIsDisposed);
assertFalse(bgReadIsDisposed.getAndSet(true));
try {
backgroundReadThread.join(5000);
assertTrue(!backgroundReadThread.isAlive());
} catch (InterruptedException e) {
// ignore
}
// big of heavy work, but now the backgroundReadThread is stopped
// and all the others are still running
}
public void addNode(String path) throws CommitFailedException {
NodeBuilder root = ns.getRoot().builder();
NodeBuilder child = root;
String[] split = path.split("/");
for (int i = 1; i < split.length; i++) {
child = child.child(split[i]);
}
logger.info("addNode: " + ns.getClusterId() + " is merging path " + path);
ns.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
public void setProperty(String path, String key, String value) throws CommitFailedException {
NodeBuilder root = ns.getRoot().builder();
NodeBuilder child = root;
String[] split = path.split("/");
for (int i = 1; i < split.length; i++) {
child = child.child(split[i]);
}
child.setProperty(key, value);
logger.info("setProperty: " + ns.getClusterId() + " is merging path/property " + path + ", key=" + key + ", value="
+ value);
ns.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
public void setLeastTimeout(long timeoutInMs, long updateIntervalInMs) throws NoSuchFieldException {
ns.getClusterInfo().setLeaseTime(timeoutInMs);
ns.getClusterInfo().setLeaseUpdateInterval(updateIntervalInMs);
PrivateAccessor.setField(ns.getClusterInfo(), "leaseEndTime", System.currentTimeMillis() - 1000);
}
void startSimulatingWrites(final long writeInterval) {
writeSimulationStopped = false;
writeSimulationThread = new Thread(new Runnable() {
final Random random = new Random();
@Override
public void run() {
while (!writeSimulationStopped) {
try {
writeSomething();
Thread.sleep(SimplifiedInstance.this.lastRevInterval);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void writeSomething() throws CommitFailedException {
final String path = "/" + ns.getClusterId() + "/" + random.nextInt(100) + "/" + random.nextInt(100) + "/"
+ random.nextInt(100);
logger.info("Writing [" + ns.getClusterId() + "]" + path);
addNode(path);
}
});
writeSimulationThread.setDaemon(true);
writeSimulationThread.start();
}
void stopSimulatingWrites() throws InterruptedException {
writeSimulationStopped = true;
if (writeSimulationThread != null) {
writeSimulationThread.join();
}
}
}
interface Expectation {
/**
* check if the expectation is fulfilled, return true if it is, return a
* descriptive error msg if not
**/
String fulfilled() throws Exception;
}
class ViewExpectation implements Expectation {
private int[] activeIds = new int[0];
private int[] deactivatingIds = new int[0];
private int[] inactiveIds = new int[0];
private final SimplifiedInstance discoveryLiteCombo;
private boolean isFinal = true;
ViewExpectation(SimplifiedInstance discoveryLiteCombo) {
this.discoveryLiteCombo = discoveryLiteCombo;
}
private int[] asIntArray(Integer[] arr) {
int[] result = new int[arr.length];
for (int i = 0; i < arr.length; i++) {
result[i] = arr[i];
}
return result;
}
void setActiveIds(Integer[] activeIds) {
this.activeIds = asIntArray(activeIds);
}
void setActiveIds(int... activeIds) {
this.activeIds = activeIds;
}
void setDeactivatingIds(int... deactivatingIds) {
this.deactivatingIds = deactivatingIds;
}
void setInactiveIds(Integer[] inactiveIds) {
this.inactiveIds = asIntArray(inactiveIds);
}
void setInactiveIds(int... inaactiveIds) {
this.inactiveIds = inaactiveIds;
}
@Override
public String fulfilled() throws Exception {
final String clusterViewStr = discoveryLiteCombo.getClusterViewStr();
if (clusterViewStr == null) {
if (activeIds.length != 0) {
return "no clusterView, but expected activeIds: " + beautify(activeIds);
}
if (deactivatingIds.length != 0) {
return "no clusterView, but expected deactivatingIds: " + beautify(deactivatingIds);
}
if (inactiveIds.length != 0) {
return "no clusterView, but expected inactiveIds: " + beautify(inactiveIds);
}
}
if (!discoveryLiteCombo.hasActiveIds(clusterViewStr, activeIds)) {
return "activeIds dont match, expected: " + beautify(activeIds) + ", got clusterView: " + clusterViewStr;
}
if (!discoveryLiteCombo.hasDeactivatingIds(clusterViewStr, deactivatingIds)) {
return "deactivatingIds dont match, expected: " + beautify(deactivatingIds) + ", got clusterView: "
+ clusterViewStr;
}
if (!discoveryLiteCombo.hasInactiveIds(clusterViewStr, inactiveIds)) {
return "inactiveIds dont match, expected: " + beautify(inactiveIds) + ", got clusterView: " + clusterViewStr;
}
if (!discoveryLiteCombo.isInvisible() && discoveryLiteCombo.isFinal() != isFinal) {
return "final flag does not match. expected: " + isFinal + ", but is: " + discoveryLiteCombo.isFinal();
}
return null;
}
private String beautify(int[] ids) {
if (ids == null) {
return "";
}
StringBuffer sb = new StringBuffer();
for (int i = 0; i < ids.length; i++) {
if (i != 0) {
sb.append(",");
}
sb.append(ids[i]);
}
return sb.toString();
}
public void setFinal(boolean isFinal) {
this.isFinal = isFinal;
}
}
// private static final boolean MONGO_DB = true;
private static final boolean MONGO_DB = false;
static final int SEED = Integer.getInteger(BaseDocumentDiscoveryLiteServiceTest.class.getSimpleName() + "-seed",
new Random().nextInt());
private List<DocumentMK> mks = Lists.newArrayList();
private MemoryDocumentStore ds;
private MemoryBlobStore bs;
final Logger logger = LoggerFactory.getLogger(this.getClass());
private List<SimplifiedInstance> allInstances = new LinkedList<SimplifiedInstance>();
@Rule
public TestRule logDumper = new LogDumper(50000);
@Rule
public TestRule logLevelModifier = new LogLevelModifier()
.newConsoleAppender("console")
.addAppenderFilter("console", "info")
.addAppenderFilter("file", "info")
.setLoggerLevel("org.apache.jackrabbit.oak", "debug");
// subsequent tests should get a DocumentDiscoveryLiteService setup from the
// start
DocumentNodeStore createNodeStore(String workingDir) throws SecurityException, Exception {
return createNodeStore(workingDir, false);
}
// subsequent tests should get a DocumentDiscoveryLiteService setup from the
// start
DocumentNodeStore createNodeStore(String workingDir, boolean invisible) throws SecurityException, Exception {
String prevWorkingDir = ClusterNodeInfo.WORKING_DIR;
try {
// ensure that we always get a fresh cluster[node]id
ClusterNodeInfo.WORKING_DIR = workingDir;
// then create the DocumentNodeStore
DocumentMK mk1 = createMK(
0 /* to make sure the clusterNodes collection is used **/,
500, /* asyncDelay: background interval */
invisible /* cluster node invisibility */);
logger.info("createNodeStore: created DocumentNodeStore with cid=" + mk1.nodeStore.getClusterId() + ", workingDir="
+ workingDir);
return mk1.nodeStore;
}
finally {
ClusterNodeInfo.WORKING_DIR = prevWorkingDir;
}
}
SimplifiedInstance createInstance() throws Exception {
return createInstance(false);
}
SimplifiedInstance createInstance(boolean invisible) throws Exception {
final String workingDir = UUID.randomUUID().toString();
return createInstance(workingDir, invisible);
}
SimplifiedInstance createInstance(String workingDir) throws SecurityException, Exception {
return createInstance(workingDir, false);
}
SimplifiedInstance createInstance(String workingDir, boolean invisible) throws SecurityException, Exception {
DocumentNodeStore ns = createNodeStore(workingDir, invisible);
return createInstance(ns, workingDir);
}
SimplifiedInstance createInstance(DocumentNodeStore ns, String workingDir) throws NoSuchFieldException {
DocumentDiscoveryLiteService discoveryLite = new DocumentDiscoveryLiteService();
PrivateAccessor.setField(discoveryLite, "nodeStore", ns);
BundleContext bc = mock(BundleContext.class);
ComponentContext c = mock(ComponentContext.class);
when(c.getBundleContext()).thenReturn(bc);
final Map<Object, Object> registeredServices = new HashMap();
when(bc.registerService(any(Class.class), any(Object.class), any())).then((Answer<ServiceRegistration>) invocation -> {
registeredServices.put(invocation.getArguments()[0], invocation.getArguments()[1]);
return null;
});
discoveryLite.activate(c);
Descriptors d = (Descriptors) registeredServices.get(Descriptors.class);
final SimplifiedInstance result = new SimplifiedInstance(discoveryLite, ns, d, registeredServices, 500, workingDir);
allInstances.add(result);
logger.info("Created " + result);
return result;
}
void waitFor(Expectation expectation, int timeout, String msg) throws Exception {
final long tooLate = System.currentTimeMillis() + timeout;
while (true) {
final String fulfillmentResult = expectation.fulfilled();
if (fulfillmentResult == null) {
// everything's fine
return;
}
if (System.currentTimeMillis() > tooLate) {
fail("expectation not fulfilled within " + timeout + "ms: " + msg + ", fulfillment result: " + fulfillmentResult);
}
Thread.sleep(100);
}
}
void dumpChildren(DocumentNodeState root) {
logger.info("testEmptyParentRecovery: root: " + root);
Iterator<String> it = root.getChildNodeNames().iterator();
while (it.hasNext()) {
String n = it.next();
logger.info("testEmptyParentRecovery: a child: '" + n + "'");
}
}
void checkFiestaState(final List<SimplifiedInstance> instances, Set<Integer> inactiveIds) throws Exception {
final List<Integer> activeIds = new LinkedList<Integer>();
for (Iterator<SimplifiedInstance> it = instances.iterator(); it.hasNext();) {
SimplifiedInstance anInstance = it.next();
if (!anInstance.isInvisible()) {
activeIds.add(anInstance.ns.getClusterId());
}
}
logger.info("checkFiestaState: checking state. expected active: "+activeIds+", inactive: "+inactiveIds);
for (Iterator<SimplifiedInstance> it = instances.iterator(); it.hasNext();) {
SimplifiedInstance anInstance = it.next();
if (!anInstance.isInvisible()) {
final ViewExpectation e = new ViewExpectation(anInstance);
e.setActiveIds(activeIds.toArray(new Integer[activeIds.size()]));
e.setInactiveIds(inactiveIds.toArray(new Integer[inactiveIds.size()]));
waitFor(e, 60000, "checkFiestaState failed for " + anInstance + ", with instances: " + instances + ","
+ " and inactiveIds: "
+ inactiveIds);
}
}
}
@Before
@After
public void clear() {
logger.info("clear: seed="+SEED);
for (SimplifiedInstance i : allInstances) {
i.dispose();
}
for (DocumentMK mk : mks) {
mk.dispose();
}
mks.clear();
if (MONGO_DB) {
MongoConnection connection = connectionFactory.getConnection();
if (connection != null) {
MongoDatabase db = connection.getDatabase();
if (db != null) {
MongoUtils.dropCollections(db);
}
}
}
}
DocumentMK createMK(int clusterId, int asyncDelay) {
return createMK(clusterId, asyncDelay, false);
}
DocumentMK createMK(int clusterId, int asyncDelay, boolean invisible) {
if (MONGO_DB) {
MongoConnection connection = connectionFactory.getConnection();
return register(new DocumentMK.Builder()
.setMongoDB(connection.getMongoClient(), connection.getDBName())
.setLeaseCheckMode(LeaseCheckMode.DISABLED).setClusterId(clusterId)
.setAsyncDelay(asyncDelay).open());
} else {
if (ds == null) {
ds = new MemoryDocumentStore();
}
if (bs == null) {
bs = new MemoryBlobStore();
}
return createMK(clusterId, asyncDelay, ds, bs, invisible);
}
}
DocumentMK createMK(int clusterId, int asyncDelay, DocumentStore ds, BlobStore bs, boolean invisible) {
return register(new DocumentMK.Builder().setDocumentStore(ds).setBlobStore(bs).setClusterId(clusterId).setClusterInvisible(invisible)
.setLeaseCheckMode(LeaseCheckMode.DISABLED)
.setAsyncDelay(asyncDelay).open());
}
DocumentMK register(DocumentMK mk) {
mks.add(mk);
return mk;
}
/**
* Probability of invisible instance at 20%
* @param random
* @return
*/
boolean isInvisibleInstance(Random random) {
boolean invisible = false;
double invisibleProb = random.nextDouble();
if (invisibleProb <= 0.2) {
invisible = true;
}
return invisible;
}
/**
* This test creates a large number of documentnodestores which it starts,
* runs, stops in a random fashion, always testing to make sure the
* clusterView is correct
*/
void doStartStopFiesta(int loopCnt) throws Throwable {
logger.info("testLargeStartStopFiesta: start, seed="+SEED);
final List<SimplifiedInstance> instances = new LinkedList<SimplifiedInstance>();
final Map<Integer, String> inactiveIds = new HashMap<Integer, String>();
final Random random = new Random(SEED);
final int CHECK_EVERY = 3;
final int MAX_NUM_INSTANCES = 8;
for (int i = 0; i < loopCnt; i++) {
if (i % CHECK_EVERY == 0) {
checkFiestaState(instances, inactiveIds.keySet());
}
final int nextInt = random.nextInt(5);
// logger.info("testLargeStartStopFiesta: iteration "+i+" with case
// "+nextInt);
String workingDir = UUID.randomUUID().toString();
switch (nextInt) {
case 0: {
// increase likelihood of creating instances..
// but reuse an inactive one if possible
if (inactiveIds.size() > 0) {
logger.info("Case 0 - reactivating an instance...");
final int n = random.nextInt(inactiveIds.size());
final Integer cid = new LinkedList<Integer>(inactiveIds.keySet()).get(n);
final String reactivatedWorkingDir = inactiveIds.remove(cid);
if (reactivatedWorkingDir == null) {
fail("reactivatedWorkingDir null for n=" + n + ", cid=" + cid + ", other inactives: " + inactiveIds);
}
assertNotNull(reactivatedWorkingDir);
logger.info("Case 0 - reactivated instance " + cid + ", workingDir=" + reactivatedWorkingDir);
workingDir = reactivatedWorkingDir;
logger.info("Case 0: creating instance");
final SimplifiedInstance newInstance = createInstance(workingDir, isInvisibleInstance(random));
newInstance.setLeastTimeout(5000, 1000);
newInstance.startSimulatingWrites(500);
logger.info("Case 0: created instance: " + newInstance.ns.getClusterId());
if (newInstance.ns.getClusterId() != cid) {
logger.info(
"Case 0: reactivated instance did not take over cid - probably a testing artifact. expected cid: {}, actual cid: {}",
cid, newInstance.ns.getClusterId());
inactiveIds.put(cid, reactivatedWorkingDir);
// remove the newly reactivated from the inactives -
// although it shouldn't be there, it might!
inactiveIds.remove(newInstance.ns.getClusterId());
}
instances.add(newInstance);
}
break;
}
case 1: {
// creates a new instance
if (instances.size() < MAX_NUM_INSTANCES) {
logger.info("Case 1: creating instance");
final SimplifiedInstance newInstance = createInstance(workingDir, isInvisibleInstance(random));
newInstance.setLeastTimeout(5000, 1000);
newInstance.startSimulatingWrites(500);
logger.info("Case 1: created instance: " + newInstance.ns.getClusterId());
instances.add(newInstance);
// OAK-3292 : in case a previously crashed or shut-down instance is created again here
// make sure to remove it from inactive (if it in the inactive list at all)
inactiveIds.remove(newInstance.ns.getClusterId());
}
break;
}
case 2: {
// do nothing
break;
}
case 3: {
// shutdown instance
if (instances.size() > 1) {
// before shutting down: make sure we have a stable view
// (we could otherwise not correctly startup too)
checkFiestaState(instances, inactiveIds.keySet());
final SimplifiedInstance instance = instances.remove(random.nextInt(instances.size()));
assertNotNull(instance.workingDir);
logger.info("Case 3: Shutdown instance: " + instance.ns.getClusterId());
inactiveIds.put(instance.ns.getClusterId(), instance.workingDir);
instance.shutdown();
}
break;
}
case 4: {
// crash instance
if (instances.size() > 1) {
// before crashing make sure we have a stable view (we
// could otherwise not correctly startup too)
checkFiestaState(instances, inactiveIds.keySet());
final SimplifiedInstance instance = instances.remove(random.nextInt(instances.size()));
assertNotNull(instance.workingDir);
logger.info("Case 4: Crashing instance: " + instance.ns.getClusterId());
if (!instance.isInvisible()) {
inactiveIds.put(instance.ns.getClusterId(), instance.workingDir);
}
instance.addNode("/" + instance.ns.getClusterId() + "/stuffForRecovery/" + random.nextInt(10000));
instance.crash();
}
break;
}
}
}
}
}