blob: d0eda68f52806cc4cb3a07e3fe04f5880eaf4c72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
/**
* Updates multiple nodes in the same commit with multiple threads and verifies
* the commit is atomic.
*/
public class ConcurrentConflictTest extends BaseDocumentMKTest {
private static final boolean USE_LOGGER = true;
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentConflictTest.class);
private static final int NUM_WRITERS = 3;
private static final int NUM_NODES = 10;
private static final int NUM_TRANSFERS_PER_THREAD = 100;
private DocumentStore store;
private List<DocumentMK> kernels = new ArrayList<DocumentMK>();
private final StringBuilder logBuffer = new StringBuilder();
@Before
@Override
public void initDocumentMK() {
logBuffer.setLength(0);
this.store = new MemoryDocumentStore();
DocumentMK mk = openDocumentMK(1);
for (int i = 0; i < NUM_NODES; i++) {
mk.commit("/", "+\"node-" + i + "\":{\"value\":100}", null, null);
}
mk.dispose();
for (int i = 0; i < NUM_WRITERS; i++) {
kernels.add(openDocumentMK(i + 2));
}
}
@After
@Override
public void disposeDocumentMK() {
super.disposeDocumentMK();
for (DocumentMK mk : kernels) {
mk.dispose();
}
kernels.clear();
}
private DocumentMK openDocumentMK(int clusterId) {
return new DocumentMK.Builder().setAsyncDelay(10).setDocumentStore(store).setClusterId(clusterId).open();
}
@Test
public void concurrentUpdatesWithBranch() throws Exception {
concurrentUpdates(true);
}
@Test
public void concurrentUpdates() throws Exception {
concurrentUpdates(false);
}
@Ignore("Enable to run concurrentUpdates() in a loop")
@Test
public void concurrentUpdates_Loop() throws Exception {
for (int i = 0; i < 1000; i++) {
System.out.println("test " + i);
concurrentUpdates(false);
// prepare for next round
disposeDocumentMK();
initDocumentMK();
}
}
private void concurrentUpdates(final boolean useBranch) throws Exception {
LOG.info("====== Start test =======");
final AtomicInteger conflicts = new AtomicInteger();
final List<Exception> exceptions = Collections.synchronizedList(
new ArrayList<Exception>());
List<Thread> writers = new ArrayList<Thread>();
for (final DocumentMK mk : kernels) {
writers.add(new Thread(new Runnable() {
Random random = new Random();
Map<Integer, JSONObject> nodes = new HashMap<Integer, JSONObject>();
@Override
public void run() {
BitSet conflictSet = new BitSet();
int numTransfers = 0;
try {
while (numTransfers < NUM_TRANSFERS_PER_THREAD && exceptions.isEmpty()) {
try {
if (!transfer()) {
continue;
}
} catch (DocumentStoreException e) {
log("Failed transfer @" + mk.getHeadRevision());
// assume conflict
conflicts.incrementAndGet();
conflictSet.set(numTransfers);
}
numTransfers++;
}
} catch (Exception e) {
exceptions.add(e);
}
log("conflicts (" + conflictSet.cardinality() + "): " + conflictSet);
}
private boolean transfer() throws Exception {
// read 3 random nodes and re-distribute values
nodes.clear();
while (nodes.size() < 3) {
nodes.put(random.nextInt(NUM_NODES), null);
}
String rev;
if (useBranch) {
rev = mk.branch(null);
} else {
rev = mk.getHeadRevision();
}
int sum = 0;
for (Map.Entry<Integer, JSONObject> entry : nodes.entrySet()) {
String json = mk.getNodes("/node-" + entry.getKey(), rev, 0, 0, 1000, null);
JSONParser parser = new JSONParser();
JSONObject obj = (JSONObject) parser.parse(json);
entry.setValue(obj);
sum += (Long) obj.get("value");
}
if (sum < 60) {
// retry with other nodes
return false;
}
StringBuilder jsop = new StringBuilder();
boolean withdrawn = false;
for (Map.Entry<Integer, JSONObject> entry : nodes.entrySet()) {
long value = (Long) entry.getValue().get("value");
jsop.append("^\"/node-").append(entry.getKey());
jsop.append("/value\":");
if (value >= 20 && !withdrawn) {
jsop.append(value - 20);
withdrawn = true;
} else {
jsop.append(value + 10);
}
}
String oldRev = rev;
rev = mk.commit("", jsop.toString(), rev, null);
if (useBranch) {
rev = mk.merge(rev, null);
}
log("Successful transfer @" + oldRev + ": " + jsop.toString() + " (new rev: " + rev + ")");
long s = calculateSum(mk, rev);
if (s != NUM_NODES * 100) {
throw new Exception("Sum mismatch: " + s);
}
return true;
}
}));
}
for (Thread t : writers) {
t.start();
}
for (Thread t : writers) {
t.join();
}
// dispose will flush all pending revisions
for (DocumentMK mk : kernels) {
mk.dispose();
}
DocumentMK mk = openDocumentMK(1);
String rev = mk.getHeadRevision();
long sum = calculateSum(mk, rev);
log("Conflict rate: " + conflicts.get() +
"/" + (NUM_WRITERS * NUM_TRANSFERS_PER_THREAD));
System.out.print(logBuffer);
assertEquals(NUM_NODES * 100, sum);
if (!exceptions.isEmpty()) {
throw exceptions.get(0);
}
mk.dispose();
}
static long calculateSum(DocumentMK mk, String rev) throws Exception {
long sum = 0;
for (int i = 0; i < NUM_NODES; i++) {
String json = mk.getNodes("/node-" + i, rev, 0, 0, 1000, null);
JSONParser parser = new JSONParser();
JSONObject obj = (JSONObject) parser.parse(json);
sum += (Long) obj.get("value");
}
return sum;
}
void log(String msg) {
if (USE_LOGGER) {
LOG.info(msg);
} else {
synchronized (logBuffer) {
logBuffer.append(msg).append("\n");
}
}
}
}