blob: ecb63c29280ba5e9472dd65b8fb62ce120f8eb3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.apache.jackrabbit.oak.commons.json.JsonObject;
import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.junit.Rule;
import org.junit.Test;
/**
* A simple randomized dual-instance test.
*/
public class RandomizedClusterTest {
private static final boolean MONGO_DB = false;
// private static final boolean MONGO_DB = true;
private static final int MK_COUNT = 2;
@Rule
public MongoConnectionFactory connectionFactory = new MongoConnectionFactory();
private MemoryDocumentStore ds;
private MemoryBlobStore bs;
private DocumentMK[] mkList = new DocumentMK[MK_COUNT];
private String[] revList = new String[MK_COUNT];
@SuppressWarnings({ "unchecked", "cast" })
private HashSet<Integer>[] unseenChanges = (HashSet<Integer>[]) new HashSet[MK_COUNT];
private HashMap<Integer, List<Op>> changes = new HashMap<Integer, List<Op>>();
private int opId;
private int mkId;
private StringBuilder log;
/**
* The map of changes. Key: node name; value: the last operation that
* changed the node.
*/
private HashMap<String, Integer> nodeChange = new HashMap<String, Integer>();
@Test
public void addRemoveSet() throws Exception {
for (int i = 0; i < MK_COUNT; i++) {
unseenChanges[i] = new HashSet<Integer>();
mkList[i] = createMK(i);
revList[i] = mkList[i].getHeadRevision();
}
HashMap<Integer, ClusterRev> revs =
new HashMap<Integer, ClusterRev>();
Random r = new Random(1);
int operations = 1000, nodeCount = 10;
int valueCount = 10;
int maxBackRev = 20;
log = new StringBuilder();
try {
int maskOk = 0, maskFail = 0;
int opCount = 6;
nodeChange.clear();
for (int i = 0; i < operations; i++) {
opId = i;
mkId = r.nextInt(mkList.length);
String node = "t" + r.nextInt(nodeCount);
String node2 = "t" + r.nextInt(nodeCount);
String property = "x";
String value = "" + r.nextInt(valueCount);
String diff;
int op = r.nextInt(opCount);
if (i < 20) {
// we need to add many nodes first, so that
// there are enough nodes to operate on
op = 0;
}
String result;
boolean conflictExpected;
switch(op) {
case 0:
diff = "+ \"" + node + "\": { \"" + property + "\": " + value + "}";
log(diff);
if (exists(node)) {
log("already exists");
result = null;
} else {
conflictExpected = isConflict(node);
result = commit(diff, conflictExpected);
if (result != null) {
changes.put(i, Arrays.asList(new Op(mkId, node, value)));
nodeChange.put(node, i);
}
}
break;
case 1:
diff = "- \"" + node + "\"";
log(diff);
if (exists(node)) {
conflictExpected = isConflict(node);
result = commit(diff, conflictExpected);
if (result != null) {
changes.put(i, Arrays.asList(new Op(mkId, node, null)));
nodeChange.put(node, i);
}
} else {
log("doesn't exist");
result = null;
}
break;
case 2:
diff = "^ \"" + node + "/" + property + "\": " + value;
log(diff);
if (exists(node)) {
conflictExpected = isConflict(node);
result = commit(diff, conflictExpected);
if (result != null) {
changes.put(i, Arrays.asList(new Op(mkId, node, value)));
nodeChange.put(node, i);
}
} else {
log("doesn't exist");
result = null;
}
break;
case 3:
diff = "> \"" + node + "\": \"" + node2 + "\"";
log(diff);
if (exists(node) && !exists(node2)) {
conflictExpected = isConflict(node) | isConflict(node2);
result = commit(diff, conflictExpected);
if (result != null) {
value = getValue(mkId, i, node);
changes.put(i, Arrays.asList(
new Op(mkId, node, null), new Op(mkId, node2, value)));
nodeChange.put(node, i);
nodeChange.put(node2, i);
}
} else {
log("source doesn't exist or target exists");
result = null;
}
break;
case 4:
if (isConflict(node)) {
// the MicroKernelImpl would report a conflict
result = null;
} else {
diff = "* \"" + node + "\": \"" + node2 + "\"";
log(diff);
if (exists(node) && !exists(node2)) {
conflictExpected = isConflict(node2);
result = commit(diff, conflictExpected);
if (result != null) {
value = getValue(mkId, i, node);
changes.put(i, Arrays.asList(new Op(mkId, node2, value)));
nodeChange.put(node2, i);
}
} else {
log("source doesn't exist or target exists");
result = null;
}
}
break;
case 5:
log("sync/refresh");
syncAndRefreshAllClusterNodes();
// go to head revision
result = revList[mkId] = mkList[mkId].getHeadRevision();
// fake failure
maskFail |= 1 << op;
break;
default:
fail();
result = null;
}
if (result == null) {
maskFail |= 1 << op;
log(" -> fail " + Integer.toBinaryString(maskFail));
} else {
maskOk |= 1 << op;
log(" -> " + result);
// all other cluster nodes didn't see this particular change yet
for (int j = 0; j < unseenChanges.length; j++) {
if (j != mkId) {
unseenChanges[j].add(i);
}
}
}
log("get " + node);
boolean x = get(i, node);
log("get " + node + " returns " + x);
log("get " + node2);
x = get(i, node2);
log("get " + node2 + " returns " + x);
DocumentMK mk = mkList[mkId];
ClusterRev cr = new ClusterRev();
cr.mkId = mkId;
cr.rev = mk.getHeadRevision();
revs.put(i, cr);
revs.remove(i - maxBackRev);
log.append('\n');
}
if (Integer.bitCount(maskOk) != opCount) {
fail("Not all operations were at least once successful: " + Integer.toBinaryString(maskOk));
}
if (Integer.bitCount(maskFail) != opCount) {
fail("Not all operations failed at least once: " + Integer.toBinaryString(maskFail));
}
} catch (AssertionError e) {
throw new Exception("log: " + log, e);
} catch (Exception e) {
throw new Exception("log: " + log, e);
}
for (int i = 0; i < MK_COUNT; i++) {
mkList[i].dispose();
}
// System.out.println(log);
// System.out.println();
}
private String getValue(int clusterId, int maxOp, String nodeName) {
for (int i = maxOp; i >= 0; i--) {
List<Op> ops = changes.get(i);
if (ops != null) {
for (Op o : ops) {
if (o.clusterId != clusterId && unseenChanges[clusterId].contains(i)) {
continue;
}
if (o.nodeName.equals(nodeName)) {
return o.value;
}
}
}
}
return null;
}
private boolean isConflict(String node) {
Integer change = nodeChange.get(node);
if (change == null || !unseenChanges[mkId].contains(change)) {
return false;
}
return true;
}
private void log(String msg) {
msg = opId + ": [" + mkId + "] " + msg + "\n";
log.append(msg);
}
private void syncClusterNode() {
for (int i = 0; i < mkList.length; i++) {
DocumentMK mk = mkList[i];
mk.backgroundWrite();
}
DocumentMK mk = mkList[mkId];
mk.backgroundRead();
}
private void syncAndRefreshAllClusterNodes() {
syncClusterNode();
for (int i = 0; i < mkList.length; i++) {
DocumentMK mk = mkList[i];
mk.backgroundRead();
revList[i] = mk.getHeadRevision();
unseenChanges[i].clear();
}
log("sync");
}
private boolean get(int maxOp, String node) {
String head = revList[mkId];
return get(maxOp, node, head);
}
private boolean exists(String node) {
String head = revList[mkId];
DocumentMK mk = mkList[mkId];
return mk.nodeExists("/" + node, head);
}
private boolean get(int maxOp, String node, String head) {
String p = "/" + node;
DocumentMK mk = mkList[mkId];
String value = getValue(mkId, maxOp, node);
if (value == null) {
assertFalse("path: " + p + " is supposed to not exist",
mk.nodeExists(p, head));
return false;
}
if (!mk.nodeExists(p, head)) {
assertTrue("path: " + p + " is supposed to exist",
mk.nodeExists(p, head));
}
String expected = "{\":childNodeCount\":0,\"x\":" + value + "}";
String result = mk.getNodes(p, head, 0, 0, Integer.MAX_VALUE, null);
expected = normalize(expected);
result = normalize(result);
assertEquals(expected, result);
return true;
}
private static String normalize(String json) {
JsopTokenizer t = new JsopTokenizer(json);
t.read('{');
JsonObject o = JsonObject.create(t);
JsopBuilder w = new JsopBuilder();
o.toJson(w);
return w.toString();
}
private String commit(String diff, boolean conflictExpected) {
boolean ok = false;
DocumentMK mk = mkList[mkId];
String rev = revList[mkId];
String result = null;
String ex = null;
if (conflictExpected) {
ok = false;
ex = "conflict expected";
} else {
ok = true;
}
if (ok) {
result = mk.commit("/", diff, rev, null);
revList[mkId] = result;
} else {
// System.out.println("--> fail " + e.toString());
try {
mk.commit("/", diff, rev, null);
fail("Should fail: " + diff + " with " + ex);
} catch (DocumentStoreException e2) {
// expected
revList[mkId] = mk.getHeadRevision();
// it might have been not a conflict with another cluster node
// TODO test two cases: conflict with other cluster node
// (this should auto-synchronize until the given conflict)
// and conflict with a previous change that was already seen,
// which shouldn't synchronize
syncAndRefreshAllClusterNodes();
}
}
return result;
}
private DocumentMK createMK(int clusterId) {
DocumentMK.Builder builder = new DocumentMK.Builder();
builder.setAsyncDelay(0);
if (MONGO_DB) {
MongoConnection c = connectionFactory.getConnection();
MongoUtils.dropCollections(c.getDBName());
builder.setMongoDB(c.getMongoClient(), c.getDBName());
} else {
if (ds == null) {
ds = new MemoryDocumentStore();
}
if (bs == null) {
bs = new MemoryBlobStore();
}
builder.setDocumentStore(ds).setBlobStore(bs);
}
return builder.setClusterId(clusterId + 1).open();
}
/**
* A revision in a certain cluster node.
*/
static class ClusterRev {
int mkId;
String rev;
}
/**
* An operation.
*/
static class Op {
final int clusterId;
final String nodeName;
final String value;
public Op(int clusterId, String nodeName, String value) {
this.clusterId = clusterId;
this.nodeName = nodeName;
this.value = value;
}
}
}