blob: d7fe74bd253b99dc596d6b4454baa2bc93c57f22 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.jcr;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.PathNotFoundException;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.SimpleCredentials;
import javax.jcr.Value;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
import javax.jcr.security.AccessControlManager;
import javax.jcr.security.Privilege;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.jackrabbit.api.JackrabbitSession;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.api.security.JackrabbitAccessControlList;
import org.apache.jackrabbit.api.security.user.User;
import org.apache.jackrabbit.api.security.user.UserManager;
import org.apache.jackrabbit.commons.JcrUtils;
import org.apache.jackrabbit.commons.jackrabbit.authorization.AccessControlUtils;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.fixture.DocumentMongoFixture;
import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
import org.apache.jackrabbit.oak.jcr.cluster.AbstractClusterTest;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.MongoUtils;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.getServices;
@Ignore("Ignore long running ObservationQueueTest")
public class ObservationQueueTest extends AbstractClusterTest {
private static final Logger LOG = LoggerFactory.getLogger(ObservationQueueTest.class);
static final long RUNTIME = TimeUnit.MINUTES.toMillis(10);
static final int NUM_WRITERS = 10;
static final int NUM_READERS = 10;
static final int NUM_OBSERVERS = 10;
static final int MAX_NODES_PER_WRITE = 30;
static final int QUEUE_LENGTH = 1000;
private static final String USER = "user";
private static final String PASSWORD = "password";
private static final long MB = 1024 * 1024;
private static final int NUM_CHILDREN = 100;
private List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
private List<Session> sessions = Lists.newArrayList();
private List<Thread> writers = Lists.newArrayList();
private List<Thread> readers = Lists.newArrayList();
private List<Thread> observers = Lists.newArrayList();
private List<Thread> loggers = Lists.newArrayList();
@BeforeClass
public static void dropDB() {
MongoUtils.dropDatabase("oak-test");
}
@After
public void logoutSessions() throws Exception {
for (Session s : sessions) {
s.logout();
}
}
@Test
public void heavyLoad() throws Throwable {
List<Whiteboard> whiteboards = Lists.newArrayList(w1, w2);
Iterator<Repository> repos = Iterators.cycle(r1, r2);
AtomicLong commitCounter = new AtomicLong();
for (int i = 0; i < NUM_WRITERS; i++) {
Session s = loginUser(repos.next());
Node n = s.getRootNode().addNode("session-" + i, "oak:Unstructured");
s.save();
writers.add(new Thread(new Writer(n, commitCounter)));
}
for (int i = 0; i < NUM_READERS; i++) {
Session s = loginUser(repos.next());
readers.add(new Thread(new Reader(s)));
}
AtomicInteger queueLength = new AtomicInteger();
loggers.add(new Thread(new QueueLogger(whiteboards, queueLength, commitCounter)));
for (int i = 0; i < NUM_OBSERVERS; i++) {
Session s = loginUser(repos.next());
observers.add(new Thread(new Observer(s, queueLength)));
}
for (Thread t : Iterables.concat(writers, readers, observers, loggers)) {
t.start();
}
for (Thread t : Iterables.concat(writers, readers)) {
t.join();
}
LOG.info("Writes stopped. Waiting for observers...");
for (Thread t : Iterables.concat(observers, loggers)) {
t.join();
}
for (Throwable t : exceptions) {
throw t;
}
}
@Override
protected NodeStoreFixture getFixture() {
return new DocumentMongoFixture() {
@Override
public NodeStore createNodeStore(int clusterNodeId) {
MongoConnection c = MongoUtils.getConnection("oak-test");
return new DocumentMK.Builder().setClusterId(clusterNodeId)
.setMongoDB(c.getMongoClient(), c.getDBName())
.setPersistentCache("target/persistentCache" + clusterNodeId + ",time,size=128")
.setJournalCache("target/journalCache" + clusterNodeId + ",time,size=128")
.memoryCacheSize(128 * MB)
.setExecutor(Executors.newCachedThreadPool())
.getNodeStore();
}
};
}
@Override
protected void prepareTestData(Session s) throws RepositoryException {
UserManager uMgr = ((JackrabbitSession) s).getUserManager();
User user = uMgr.createUser(USER, PASSWORD);
s.save();
AccessControlManager acMgr = s.getAccessControlManager();
JackrabbitAccessControlList tmpl = AccessControlUtils.getAccessControlList(acMgr, "/");
tmpl.addEntry(user.getPrincipal(), new Privilege[]{acMgr.privilegeFromName(Privilege.JCR_ALL)},
true, Collections.<String, Value>emptyMap());
acMgr.setPolicy(tmpl.getPath(), tmpl);
s.save();
}
@Override
protected Jcr customize(Jcr jcr) {
return super.customize(jcr).withObservationQueueLength(QUEUE_LENGTH);
}
private Session loginUser(Repository repo) throws RepositoryException {
Session s = repo.login(new SimpleCredentials(USER, PASSWORD.toCharArray()));
sessions.add(s);
return s;
}
private static void log(String msg, Object... args) {
LOG.debug(msg, args);
}
private static void logRead(Node n) throws RepositoryException {
log("Read node {}", n.getPath());
}
private class Reader extends Task {
private final Random r;
public Reader(Session s)
throws RepositoryException {
super(s);
this.r = new Random();
}
@Override
void perform() throws Exception {
s.refresh(false);
List<Node> nodes = Lists.newArrayList();
for (NodeIterator it = s.getRootNode().getNodes(); it.hasNext(); ) {
Node n = it.nextNode();
if (n.getName().startsWith("session-")) {
nodes.add(n);
}
}
if (nodes.isEmpty()) {
return;
}
Node node = nodes.get(r.nextInt(nodes.size()));
for (int i = 0; i < 2 && node != null; i++) {
try {
node = node.getNode("node-" + r.nextInt(NUM_CHILDREN));
logRead(node);
} catch (PathNotFoundException e) {
// ignore
}
}
if (node != null && node.hasProperty("c")) {
long count = node.getProperty("c").getLong();
logRead(node.getNode("node-" + r.nextInt((int) count)));
}
}
}
private class Writer extends Task {
private final Random r;
private final Node node;
private final AtomicLong commitCounter;
public Writer(Node node, AtomicLong commitCounter) throws RepositoryException {
super(node.getSession());
this.r = new Random();
this.node = node;
this.commitCounter = commitCounter;
}
@Override
void perform() throws Exception {
Node p = node;
for (int i = 0; i < 2; i++) {
p = JcrUtils.getOrAddNode(p, "node-" + r.nextInt(NUM_CHILDREN), "oak:Unstructured");
}
// set property or add node?
if (r.nextBoolean()) {
long v = p.setProperty("p", r.nextInt()).getLong();
log("Set property to {} on {}", v, p.getPath());
} else {
int numNodes = r.nextInt(MAX_NODES_PER_WRITE) + 1;
int depth = (int) Math.ceil(Math.log(numNodes) / Math.log(3));
long count = JcrUtils.getLongProperty(p, "c", 0);
Node n = p.addNode("node-" + count, "oak:Unstructured");
p.setProperty("c", ++count);
createNodes(n, new AtomicInteger(--numNodes), depth);
log("Add node {}", n.getPath());
}
s.save();
commitCounter.incrementAndGet();
}
void createNodes(Node parent, AtomicInteger remaining, int depth)
throws RepositoryException {
depth--;
for (int i = 0; i < 3 && remaining.get() > 0; i++) {
Node n = parent.addNode("node-" + i);
log("Add node {}", n.getPath());
remaining.decrementAndGet();
if (depth > 0) {
createNodes(n, remaining, depth);
}
}
}
}
private class Observer extends Task implements EventListener {
private final AtomicLong numEvents = new AtomicLong();
private final AtomicInteger queueLength;
public Observer(Session s, AtomicInteger queueLength)
throws RepositoryException {
super(s);
this.queueLength = queueLength;
s.getWorkspace().getObservationManager()
.addEventListener(this,
Event.NODE_ADDED | Event.PROPERTY_ADDED,
"/", true, null, null, false);
}
@Override
void perform() throws Exception {
Thread.sleep(1000);
}
@Override
protected boolean running() {
return super.running() || queueLength.get() > 0;
}
@Override
public void onEvent(EventIterator events) {
while (events.hasNext()) {
numEvents.incrementAndGet();
try {
Event e = events.nextEvent();
String p = e.getPath();
log("Event received {}", p);
if (e.getType() == Event.PROPERTY_ADDED) {
p = PathUtils.getParentPath(p);
}
s.getNode(p);
} catch (RepositoryException e) {
exceptions.add(e);
}
}
}
}
private class QueueLogger extends Task {
private final List<Whiteboard> whiteboards;
private final AtomicInteger queueLength;
private final AtomicLong commitCounter;
QueueLogger(List<Whiteboard> whiteboards,
AtomicInteger queueLength,
AtomicLong commitCounter) {
super(null);
this.whiteboards = whiteboards;
this.queueLength = queueLength;
this.commitCounter = commitCounter;
}
@Override
void perform() throws Exception {
List<String> stats = Lists.newArrayList();
for (Whiteboard w : whiteboards) {
stats.add(queueStats(w));
}
LOG.info("Observation queue stats: {}, commits: {}",
stats, commitCounter.get());
Thread.sleep(1000);
}
@Override
protected boolean running() {
return super.running() || queueLength.get() > 0;
}
private String queueStats(Whiteboard w) {
int len = -1;
int ext = -1;
for (BackgroundObserverMBean bean : getServices(w, BackgroundObserverMBean.class)) {
len = Math.max(bean.getQueueSize(), len);
ext = Math.max(bean.getExternalEventCount(), ext);
}
if (len >= 0) {
queueLength.set(len);
}
int numListeners = 0;
long micros = 0;
for (EventListenerMBean bean : getServices(w, EventListenerMBean.class)) {
micros += bean.getMicrosecondsPerEventDelivery();
numListeners++;
}
List<String> stats = Lists.newArrayList();
stats.add(String.valueOf(len));
stats.add(String.valueOf(ext));
stats.add(String.valueOf(micros / numListeners));
return stats.toString();
}
}
abstract class Task implements Runnable {
final Session s;
long end;
public Task(Session s) {
this.s = s;
}
protected boolean running() {
return System.currentTimeMillis() < end;
}
@Override
public void run() {
try {
end = System.currentTimeMillis() + RUNTIME;
while (running()) {
perform();
}
} catch (Throwable t) {
exceptions.add(t);
}
}
abstract void perform() throws Exception;
}
}