blob: 7ad27a10e1274f833810e717c22a8de7be821755 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.composite;
import com.google.common.io.Closer;
import org.apache.jackrabbit.JcrConstants;
import org.apache.jackrabbit.oak.InitialContent;
import org.apache.jackrabbit.oak.Oak;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.api.PropertyValue;
import org.apache.jackrabbit.oak.api.QueryEngine;
import org.apache.jackrabbit.oak.api.Result;
import org.apache.jackrabbit.oak.api.Root;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.UUIDUtils;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.plugins.memory.PropertyValues;
import org.apache.jackrabbit.oak.spi.mount.Mounts;
import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jcr.query.Query;
import java.io.IOException;
import java.text.ParseException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Sets.newHashSet;
import static org.apache.jackrabbit.oak.api.QueryEngine.NO_MAPPINGS;
import static org.junit.Assert.assertEquals;
public class AtomicCompositeMergeTest {
private static final Logger LOG = LoggerFactory.getLogger(AtomicCompositeMergeTest.class);
private static final int THREADS = 6;
private static final String TEST_UUID = UUIDUtils.generateUUID();
private Closer closer;
@Before
public void initNodeStore() {
closer = Closer.create();
}
@After
public void closeAll() throws IOException {
closer.close();
}
@Test
public void testLocalMerges() throws InterruptedException, IOException, ParseException, CommitFailedException {
Oak oak = getOak(getCompositeNodeStore(new MemoryNodeStore()));
testAtomicMerges(clusterId -> oak);
}
@Test
public void testDistributedMerge() throws InterruptedException, IOException, ParseException, CommitFailedException {
MemoryDocumentStore sharedDocStore = new MemoryDocumentStore();
testAtomicMerges(clusterId -> {
DocumentNodeStore docNodeStore = new DocumentMK.Builder()
.setDocumentStore(sharedDocStore)
.setClusterId(clusterId)
.setUpdateLimit(10000)
.getNodeStore();
closer.register(() -> docNodeStore.dispose());
NodeStore compositeNodeStore = getCompositeNodeStore(docNodeStore);
return getOak(compositeNodeStore);
});
}
private void testAtomicMerges(Function<Integer, Oak> oakSupplier) throws InterruptedException, IOException, ParseException, CommitFailedException {
Set<String> failedMerges = Collections.synchronizedSet(newHashSet());
List<Thread> threads = newArrayList();
ContentSession generalSession = oakSupplier.apply(100).createContentSession();
closer.register(generalSession);
waitForReindexing(generalSession);
sleep(1000); // sleep for a sec, so the new repository have chance to initialize itself
for (int i = 0; i < THREADS; i++) {
String name = "child_" + i;
Oak oak = oakSupplier.apply(i + 1);
ContentSession session = oak.createContentSession();
threads.add(new Thread(() -> {
LOG.info("Started thread {}", name);
try {
Root root = session.getLatestRoot();
root.getTree("/").addChild(name).setProperty(JcrConstants.JCR_UUID, TEST_UUID);
root.commit();
LOG.info("Merged successfully the node /{}: {}", name, root.getTree("/" + name));
} catch (CommitFailedException e) {
LOG.info("Expected failure", e);
failedMerges.add(name);
} catch (Exception e) {
LOG.error("Can't commit", e);
} finally {
IOUtils.closeQuietly(session);
}
}));
}
threads.forEach(Thread::start);
threads.forEach(AtomicCompositeMergeTest::join);
List<String> uuidPaths = waitForUuid(generalSession, TEST_UUID);
assertEquals("There should be just one indexed value for the TEST_UUID, but following are given: " + uuidPaths + ". Failed merge list: " + failedMerges, 1, uuidPaths.size());
assertEquals("There should be " + (THREADS - 1) + " failed merges, but following are given: " + failedMerges,THREADS - 1, failedMerges.size());
}
private static List<String> waitForUuid(ContentSession session, String uuid) throws ParseException {
for (int i = 0; i < 20; i++) {
List<String> result = queryUuid(session, uuid);
if (result.isEmpty()) {
sleep(500);
} else {
return result;
}
}
return Collections.emptyList();
}
private static List<String> queryUuid(ContentSession session, String uuid) throws ParseException {
Map<String, PropertyValue> bindings = Collections.singletonMap("id", PropertyValues.newString(uuid));
Result result = session.getLatestRoot().getQueryEngine().executeQuery(
"SELECT * FROM [nt:base] WHERE [jcr:uuid] = $id" + QueryEngine.INTERNAL_SQL2_QUERY,
Query.JCR_SQL2,
bindings, NO_MAPPINGS);
return StreamSupport.stream(result.getRows().spliterator(), false)
.map(r -> r.getPath())
.collect(Collectors.toList());
}
private void waitForReindexing(ContentSession session) throws CommitFailedException, ParseException {
String tmpUuid = UUIDUtils.generateUUID();
Root root = session.getLatestRoot();
root.getTree("/").addChild("tmp").setProperty(JcrConstants.JCR_UUID, tmpUuid);
root.commit();
assertEquals(1, waitForUuid(session, tmpUuid).size());
root.getTree("/tmp").remove();
root.commit();
}
private static void join(Thread t) {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void sleep(long millis) {
if (millis <= 0) {
return;
}
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
LOG.error("Interrupted", e);
}
}
private static NodeStore getCompositeNodeStore(NodeStore globalNodeStore) {
return new CompositeNodeStore(Mounts.defaultMountInfoProvider(), globalNodeStore, Collections.emptyList());
}
private static Oak getOak(NodeStore nodeStore) {
return new Oak(nodeStore)
.with(new OpenSecurityProvider())
.with(new PropertyIndexEditorProvider())
.with(new InitialContent());
}
}