blob: 9ec56c948ecf70a09ebfe6dbc0f3a966477c44b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.jcr;
import static org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor.PROP_COUNTER;
import static org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor.PROP_INCREMENT;
import static org.apache.jackrabbit.oak.spi.nodetype.NodeTypeConstants.MIX_ATOMIC_COUNTER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assume.assumeTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.Session;
import org.apache.jackrabbit.oak.commons.FixturesHelper;
import org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture;
import org.apache.jackrabbit.oak.commons.PerfLogger;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFutureTask;
public class AtomicCounterClusterIT extends DocumentClusterIT {
private static final Set<Fixture> FIXTURES = FixturesHelper.getFixtures();
private static final Logger LOG = LoggerFactory.getLogger(AtomicCounterClusterIT.class);
private static final PerfLogger LOG_PERF = new PerfLogger(LOG);
private List<CustomScheduledExecutor> executors = Lists.newArrayList();
@BeforeClass
public static void assumtions() {
assumeTrue(FIXTURES.contains(Fixture.DOCUMENT_NS));
assumeTrue(OakMongoNSRepositoryStub.isMongoDBAvailable());
}
@Override
public void before() throws Exception {
super.before();
executors = Lists.newArrayList();
}
@Override
public void after() throws Exception {
super.after();
for (CustomScheduledExecutor exec : executors) {
new ExecutorCloser(exec, 10, TimeUnit.SECONDS).close();
}
}
@Test
public void increments() throws Exception {
setUpCluster(this.getClass(), mks, repos, NOT_PROVIDED);
assertEquals("repositories and executors should match", repos.size(), executors.size());
final String counterPath;
final Random rnd = new Random(14);
final AtomicLong expected = new AtomicLong(0);
final Map<String, Exception> exceptions = Collections.synchronizedMap(
new HashMap<String, Exception>());
// setting-up the repo state
Repository repo = repos.get(0);
Session session = repo.login(ADMIN);
Node counter;
try {
counter = session.getRootNode().addNode("counter");
counter.addMixin(MIX_ATOMIC_COUNTER);
session.save();
counterPath = counter.getPath();
} finally {
session.logout();
}
// allow the cluster to align
alignCluster(mks);
// asserting the initial state
assertFalse("Path to the counter node should be set", Strings.isNullOrEmpty(counterPath));
for (Repository r : repos) {
try {
session = r.login(ADMIN);
counter = session.getNode(counterPath);
assertEquals("Nothing should have touched the `expected`", 0, expected.get());
assertEquals(
"Wrong initial counter",
expected.get(),
counter.getProperty(PROP_COUNTER).getLong());
} finally {
session.logout();
}
}
// number of threads per cluster node
final int numIncrements = Integer.getInteger("oak.test.it.atomiccounter.threads", 100);
LOG.debug(
"pushing {} increments per each of the {} cluster nodes for a total of {} concurrent updates",
numIncrements, repos.size(), numIncrements * repos.size());
// for each cluster node, `numIncrements` sessions pushing random increments
long start = LOG_PERF.start("Firing the threads");
List<ListenableFutureTask<Void>> tasks = Lists.newArrayList();
for (Repository rep : repos) {
final Repository r = rep;
for (int i = 0; i < numIncrements; i++) {
ListenableFutureTask<Void> task = ListenableFutureTask.create(new Callable<Void>() {
@Override
public Void call() throws Exception {
Session s = r.login(ADMIN);
try {
try {
Node n = s.getNode(counterPath);
int increment = rnd.nextInt(10) + 1;
n.setProperty(PROP_INCREMENT, increment);
expected.addAndGet(increment);
s.save();
} finally {
s.logout();
}
} catch (Exception e) {
exceptions.put(Thread.currentThread().getName(), e);
}
return null;
}
});
new Thread(task).start();
tasks.add(task);
}
}
LOG_PERF.end(start, -1, "Firing threads completed", "");
Futures.allAsList(tasks).get();
LOG_PERF.end(start, -1, "Futures completed", "");
waitForTaskCompletion();
LOG_PERF.end(start, -1, "All tasks completed", "");
// let the time for the async process to kick in and run.
Thread.sleep(5000);
raiseExceptions(exceptions, LOG);
// assert the final situation
for (int i = 0; i < repos.size(); i++) {
Repository r = repos.get(i);
try {
session = r.login(ADMIN);
counter = session.getNode(counterPath);
LOG.debug("Cluster node: {}, actual counter: {}, expected counter: {}", i + 1,
expected.get(), counter.getProperty(PROP_COUNTER).getLong());
assertEquals(
"Wrong counter on node " + (i + 1),
expected.get(),
counter.getProperty(PROP_COUNTER).getLong());
} finally {
session.logout();
}
}
}
private void waitForTaskCompletion() throws InterruptedException {
int remainingTasks;
do {
remainingTasks = 0;
for (CustomScheduledExecutor e : executors) {
remainingTasks += e.getTotal();
}
if (remainingTasks > 0) {
LOG.debug("there are approximately {} tasks left to complete. Sleeping 1 sec",
remainingTasks);
Thread.sleep(1000);
}
} while (remainingTasks > 0);
}
private class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
private volatile AtomicInteger total = new AtomicInteger();
private class CustomTask<V> implements RunnableScheduledFuture<V> {
private final RunnableScheduledFuture<V> task;
public CustomTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
this.task = task;
}
@Override
public void run() {
task.run();
total.decrementAndGet();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return task.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return task.isCancelled();
}
@Override
public boolean isDone() {
return task.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return task.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return task.get(timeout, unit);
}
@Override
public long getDelay(TimeUnit unit) {
return task.getDelay(unit);
}
@Override
public int compareTo(Delayed o) {
return task.compareTo(o);
}
@Override
public boolean isPeriodic() {
return task.isPeriodic();
}
}
public CustomScheduledExecutor(int corePoolSize) {
super(corePoolSize);
total.set(0);
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable,
RunnableScheduledFuture<V> task) {
if (callable instanceof AtomicCounterEditor.ConsolidatorTask) {
total.incrementAndGet();
return new CustomTask<V>(callable, task);
} else {
return super.decorateTask(callable, task);
}
}
/**
* return the approximate amount of tasks to be completed
* @return
*/
public synchronized int getTotal() {
return total.get();
}
}
@Override
protected Jcr getJcr(NodeStore store) {
CustomScheduledExecutor e = new CustomScheduledExecutor(10);
executors.add(e);
return super.getJcr(store)
.with(e)
.withAtomicCounter();
}
}