blob: bf41f3f32674d40dda8c5c956e96632a4ff5b478 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.jcr;
import static java.lang.Thread.UncaughtExceptionHandler;
import static org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest.dispose;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jcr.Node;
import javax.jcr.PropertyType;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.SimpleCredentials;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
/**
* Concurrently add nodes with multiple sessions on multiple cluster nodes.
*/
public class ConcurrentAddNodesClusterIT {
private static final int NUM_CLUSTER_NODES = 5;
private static final int NODE_COUNT = 100;
private static final int LOOP_COUNT = 10;
private static final int WORKER_COUNT = 20;
private static final String PROP_NAME = "testcount";
private List<Repository> repos = new ArrayList<Repository>();
private List<DocumentMK> mks = new ArrayList<DocumentMK>();
private List<Thread> workers = new ArrayList<Thread>();
@BeforeClass
public static void mongoDBAvailable() {
Assume.assumeTrue(OakMongoNSRepositoryStub.isMongoDBAvailable());
}
@Before
public void before() throws Exception {
dropDB();
initRepository();
}
@After
public void after() throws Exception {
workers.clear();
for (Repository repo : repos) {
dispose(repo);
}
repos.clear();
for (DocumentMK mk : mks) {
mk.dispose();
}
mks.clear();
dropDB();
}
@Test
public void addNodesConcurrent() throws Exception {
for (int i = 0; i < NUM_CLUSTER_NODES; i++) {
MongoConnection c = createConnection();
DocumentMK mk = new DocumentMK.Builder()
.setMongoDB(c.getMongoClient(), c.getDBName())
.setClusterId(i + 1).open();
mks.add(mk);
}
Map<String, Exception> exceptions = Collections.synchronizedMap(
new HashMap<String, Exception>());
for (int i = 0; i < mks.size(); i++) {
DocumentMK mk = mks.get(i);
Repository repo = new Jcr(mk.getNodeStore()).createRepository();
repos.add(repo);
workers.add(new Thread(new Worker(repo, exceptions), "Worker-" + (i + 1)));
}
for (Thread t : workers) {
t.start();
}
for (Thread t : workers) {
t.join();
}
for (Map.Entry<String, Exception> entry : exceptions.entrySet()) {
// System.out.println("exception in thread " + entry.getKey());
throw entry.getValue();
}
}
@Ignore("OAK-1807")
@Test
public void addNodesConcurrent2() throws Exception {
final Thread mainThread = Thread.currentThread();
for (int i = 0; i < NUM_CLUSTER_NODES; i++) {
MongoConnection c = createConnection();
DocumentMK mk = new DocumentMK.Builder()
.setMongoDB(c.getMongoClient(), c.getDBName())
.setClusterId(i + 1).open();
mks.add(mk);
}
final Map<String, Exception> exceptions = Collections.synchronizedMap(
new HashMap<String, Exception>());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean stop = new AtomicBoolean();
final UncaughtExceptionHandler ueh = new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
RuntimeException r = new RuntimeException("Exception in thread "+t.getName(), e);
r.printStackTrace();
}
};
for (int i = 0; i < mks.size(); i++) {
DocumentMK mk = mks.get(i);
final Repository repo = new Jcr(mk.getNodeStore()).createRepository();
repos.add(repo);
for (int w = 0; w <= WORKER_COUNT; w++) {
final String name = "Worker-" + (i + 1) + "-" + (w + 1);
final Runnable r = new Runnable() {
final Session session = createAdminSession(repo);
int count = 0;
@Override
public void run() {
try {
Uninterruptibles.awaitUninterruptibly(latch);
session.refresh(false);
Node node = session.getRootNode().addNode(name+count++, "oak:Unstructured");
for (int j = 0; j < NODE_COUNT && !stop.get() ; j++) {
node.addNode("node" + j);
session.save();
}
} catch (RepositoryException e) {
RuntimeException r = new RuntimeException("Exception in thread "+name, e);
r.printStackTrace();
exceptions.put(Thread.currentThread().getName(), r);
stop.set(true);
mainThread.interrupt();
} finally {
session.logout();
}
}
};
//Last runnable would be a long running one
Runnable runnable = r;
if(w == WORKER_COUNT){
runnable = new Runnable() {
@Override
public void run() {
while(!stop.get()){
r.run();
}
}
};
}
Thread t = new Thread(runnable);
t.setName(name);
t.setUncaughtExceptionHandler(ueh);
workers.add(t);
}
}
for (Thread t : workers) {
t.start();
}
latch.countDown();
TimeUnit.MINUTES.sleep(10);
stop.set(true);
for (Thread t : workers) {
t.join();
}
for (Map.Entry<String, Exception> entry : exceptions.entrySet()) {
// System.out.println("exception in thread " + entry.getKey());
throw entry.getValue();
}
}
@Test
public void addNodes() throws Exception {
for (int i = 0; i < 2; i++) {
MongoConnection c = createConnection();
DocumentMK mk = new DocumentMK.Builder()
.setMongoDB(c.getMongoClient(), c.getDBName())
.setClusterId(i + 1).open();
mks.add(mk);
}
final DocumentMK mk1 = mks.get(0);
final DocumentMK mk2 = mks.get(1);
Repository r1 = new Jcr(mk1.getNodeStore()).createRepository();
repos.add(r1);
Repository r2 = new Jcr(mk2.getNodeStore()).createRepository();
repos.add(r2);
Session s1 = r1.login(new SimpleCredentials("admin", "admin".toCharArray()));
Session s2 = r2.login(new SimpleCredentials("admin", "admin".toCharArray()));
ensureIndex(s1.getRootNode(), PROP_NAME);
ensureIndex(s2.getRootNode(), PROP_NAME);
Map<String, Exception> exceptions = Collections.synchronizedMap(
new HashMap<String, Exception>());
createNodes(s1, "testroot-1", 1, 1, exceptions);
createNodes(s2, "testroot-2", 1, 1, exceptions);
for (Map.Entry<String, Exception> entry : exceptions.entrySet()) {
throw entry.getValue();
}
s1.logout();
s2.logout();
}
@Test
public void addNodes2() throws Exception {
for (int i = 0; i < 3; i++) {
MongoConnection c = createConnection();
DocumentMK mk = new DocumentMK.Builder()
.setMongoDB(c.getMongoClient(), c.getDBName())
.setAsyncDelay(0)
.setClusterId(i + 1).open();
mks.add(mk);
}
final DocumentMK mk1 = mks.get(0);
final DocumentMK mk2 = mks.get(1);
final DocumentMK mk3 = mks.get(2);
Repository r1 = new Jcr(mk1.getNodeStore()).createRepository();
repos.add(r1);
Repository r2 = new Jcr(mk2.getNodeStore()).createRepository();
repos.add(r2);
Repository r3 = new Jcr(mk3.getNodeStore()).createRepository();
repos.add(r3);
Session s1 = r1.login(new SimpleCredentials("admin", "admin".toCharArray()));
Session s2 = r2.login(new SimpleCredentials("admin", "admin".toCharArray()));
Session s3 = r3.login(new SimpleCredentials("admin", "admin".toCharArray()));
ensureIndex(s1.getRootNode(), PROP_NAME);
runBackgroundOps(mk1);
runBackgroundOps(mk2);
runBackgroundOps(mk3);
// begin test
Node root2 = s2.getRootNode().addNode("testroot-Worker-2", "nt:unstructured");
createNodes(root2, "testnode0");
s2.save();
createNodes(root2, "testnode1");
runBackgroundOps(mk1);
runBackgroundOps(mk3);
runBackgroundOps(mk2); // publish 'testroot-Worker-2/testnode0'
Node root3 = s3.getRootNode().addNode("testroot-Worker-3", "nt:unstructured");
createNodes(root3, "testnode0");
s2.save();
createNodes(root2, "testnode2");
runBackgroundOps(mk1); // sees 'testroot-Worker-2/testnode0'
runBackgroundOps(mk3); // sees 'testroot-Worker-2/testnode0'
runBackgroundOps(mk2); // publish 'testroot-Worker-2/testnode1'
// subsequent read on mk3 will read already published docs from mk2
s3.save();
createNodes(root3, "testnode1");
Node root1 = s1.getRootNode().addNode("testroot-Worker-1", "nt:unstructured");
createNodes(root1, "testnode0");
s2.save();
createNodes(root2, "testnode3");
runBackgroundOps(mk1);
runBackgroundOps(mk3);
runBackgroundOps(mk2);
s1.save();
createNodes(root1, "testnode1");
s3.save();
createNodes(root3, "testnode2");
runBackgroundOps(mk1);
s1.save();
s1.logout();
s2.logout();
s3.logout();
}
@Test
public void rebaseVisibility() throws Exception {
for (int i = 0; i < 2; i++) {
MongoConnection c = createConnection();
DocumentMK mk = new DocumentMK.Builder()
.setMongoDB(c.getMongoClient(), c.getDBName())
.setAsyncDelay(0)
.setClusterId(i + 1).open();
mks.add(mk);
}
final DocumentMK mk1 = mks.get(0);
final DocumentMK mk2 = mks.get(1);
Repository r1 = new Jcr(mk1.getNodeStore()).createRepository();
repos.add(r1);
Repository r2 = new Jcr(mk2.getNodeStore()).createRepository();
repos.add(r2);
Session s1 = r1.login(new SimpleCredentials("admin", "admin".toCharArray()));
Session s2 = r2.login(new SimpleCredentials("admin", "admin".toCharArray()));
Node root1 = s1.getRootNode().addNode("session-1");
s1.save();
Node root2 = s2.getRootNode().addNode("session-2");
s2.save();
runBackgroundOps(mk1);
runBackgroundOps(mk2);
runBackgroundOps(mk1);
createNodes(root1, "nodes");
createNodes(root2, "nodes");
s2.save();
runBackgroundOps(mk2);
runBackgroundOps(mk1);
assertFalse(s1.getRootNode().hasNode("session-2/nodes"));
s1.refresh(true);
assertTrue(s1.getRootNode().hasNode("session-2/nodes"));
s1.logout();
s2.logout();
}
private static MongoConnection createConnection() throws Exception {
return OakMongoNSRepositoryStub.createConnection(
ConcurrentAddNodesClusterIT.class.getSimpleName());
}
private static void dropDB() throws Exception {
MongoConnection con = createConnection();
try {
con.getDatabase().drop();
} finally {
con.close();
}
}
private static void initRepository() throws Exception {
MongoConnection con = createConnection();
DocumentMK mk = new DocumentMK.Builder()
.setMongoDB(con.getMongoClient(), con.getDBName())
.setClusterId(1).open();
Repository repository = new Jcr(mk.getNodeStore()).createRepository();
Session session = repository.login(
new SimpleCredentials("admin", "admin".toCharArray()));
session.logout();
dispose(repository);
mk.dispose(); // closes connection as well
}
private static void ensureIndex(Node root, String propertyName)
throws RepositoryException {
Node indexDef = root.getNode(IndexConstants.INDEX_DEFINITIONS_NAME);
if (indexDef.hasNode(propertyName)) {
return;
}
Node index = indexDef.addNode(propertyName, IndexConstants.INDEX_DEFINITIONS_NODE_TYPE);
index.setProperty(IndexConstants.TYPE_PROPERTY_NAME,
PropertyIndexEditorProvider.TYPE);
index.setProperty(IndexConstants.REINDEX_PROPERTY_NAME,
true);
index.setProperty(IndexConstants.PROPERTY_NAMES,
new String[] { propertyName }, PropertyType.NAME);
try {
root.getSession().save();
} catch (RepositoryException e) {
// created by other thread -> ignore
root.getSession().refresh(false);
}
}
private static void runBackgroundOps(DocumentMK mk) throws Exception {
mk.getNodeStore().runBackgroundOperations();
}
private final class Worker implements Runnable {
private final Repository repo;
private final Map<String, Exception> exceptions;
Worker(Repository repo, Map<String, Exception> exceptions) {
this.repo = repo;
this.exceptions = exceptions;
}
@Override
public void run() {
try {
Session session = createAdminSession(repo);
try {
ensureIndex(session.getRootNode(), PROP_NAME);
String nodeName = "testroot-" + Thread.currentThread().getName();
createNodes(session, nodeName, LOOP_COUNT, NODE_COUNT, exceptions);
} finally {
session.logout();
}
} catch (Exception e) {
exceptions.put(Thread.currentThread().getName(), e);
}
}
}
private Session createAdminSession(Repository repository) throws RepositoryException {
return repository.login(new SimpleCredentials("admin", "admin".toCharArray()));
}
private void createNodes(Session session,
String nodeName,
int loopCount,
int nodeCount,
Map<String, Exception> exceptions)
throws RepositoryException {
Node root = session.getRootNode().addNode(nodeName, "nt:unstructured");
for (int i = 0; i < loopCount; i++) {
Node node = root.addNode("testnode" + i, "nt:unstructured");
for (int j = 0; j < nodeCount; j++) {
Node child = node.addNode("node" + j, "nt:unstructured");
child.setProperty(PROP_NAME, j);
}
if (!exceptions.isEmpty()) {
break;
}
session.save();
}
}
private void createNodes(Node parent, String child)
throws RepositoryException {
Node node = parent.addNode(child, "nt:unstructured");
for (int i = 0; i < NODE_COUNT; i++) {
Node c = node.addNode("node" + i, "nt:unstructured");
c.setProperty(PROP_NAME, i);
}
}
}