blob: b1377df499c6d9db5203f30b701b6ac4e9429524 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.jcr;
import static org.apache.jackrabbit.JcrConstants.NT_UNSTRUCTURED;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.spi.nodetype.NodeTypeConstants.NT_OAK_UNSTRUCTURED;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import org.apache.jackrabbit.oak.commons.FixturesHelper;
import org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
import org.jetbrains.annotations.NotNull;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.read.ListAppender;
import ch.qos.logback.core.spi.FilterReply;
import com.google.common.collect.ImmutableSet;
public class AsyncConflictsIT extends DocumentClusterIT {
private static final Set<Fixture> FIXTURES = FixturesHelper.getFixtures();
private static final String INDEX_DEF_NODE = "asyncconflict";
private static final String INDEX_PROPERTY = "number";
private static final Logger LOG = LoggerFactory.getLogger(AsyncConflictsIT.class);
@BeforeClass
public static void assumptions() {
assumeTrue(FIXTURES.contains(Fixture.DOCUMENT_NS));
assumeTrue(OakMongoNSRepositoryStub.isMongoDBAvailable());
}
@Test
public void updates() throws Exception {
final Map<String, Exception> exceptions = Collections.synchronizedMap(new HashMap<String, Exception>());
final Random generator = new Random(3);
final ListAppender<ILoggingEvent> logAppender = subscribeAppender();
setUpCluster(this.getClass(), mks, repos, NOT_PROVIDED);
defineIndex(repos.get(0));
final int numberNodes = 10000;
LOG.info("adding {} nodes", numberNodes);
Session s = repos.get(0).login(ADMIN);
Node test = s.getRootNode().addNode("test");
test.setPrimaryType(NT_OAK_UNSTRUCTURED);
try {
for (int i = 0; i < numberNodes; i++) {
test.addNode("node" + i);
test.setProperty(INDEX_PROPERTY, generator.nextInt(numberNodes/3));
if (i % 1024 == 0) {
s.save();
}
}
s.save();
} catch (Exception e) {
exceptions.put(Thread.currentThread().getName(), e);
} finally {
s.logout();
}
LOG.info("Nodes added.");
// issuing re-index
LOG.info("issuing re-index and wait for finish");
s = repos.get(0).login(ADMIN);
try {
Node index = s.getNode("/oak:index/" + INDEX_DEF_NODE);
index.setProperty(REINDEX_PROPERTY_NAME, true);
s.save();
} catch (Exception e) {
exceptions.put(Thread.currentThread().getName(), e);
} finally {
s.logout();
}
while (!isReindexFinished()) {
Thread.sleep(5000);
}
raiseExceptions(exceptions, LOG);
// if following fails it means the Async index failed at least once.
assertTrue(
String.format("We should have not any '%s' in the logs", AsyncLogFilter.MESSAGE),
logAppender.list.isEmpty());
unsubscribe(logAppender);
}
private boolean isReindexFinished() throws RepositoryException {
Session s = repos.get(0).login(ADMIN);
try {
boolean reindex = s.getNode("/oak:index/" + INDEX_DEF_NODE)
.getProperty(REINDEX_PROPERTY_NAME).getBoolean();
return !reindex;
} finally {
s.logout();
}
}
private void defineIndex(@NotNull final Repository repo) throws RepositoryException {
Session session = repo.login(ADMIN);
try {
Node n = session.getRootNode().getNode("oak:index");
n = n.addNode(INDEX_DEF_NODE);
n.setPrimaryType(IndexConstants.INDEX_DEFINITIONS_NODE_TYPE);
n.setProperty("compatVersion", 2);
n.setProperty(TYPE_PROPERTY_NAME, "lucene");
n.setProperty(ASYNC_PROPERTY_NAME, "async");
n = n.addNode("indexRules");
n.setPrimaryType(NT_UNSTRUCTURED);
n = n.addNode("nt:unstructured");
n = n.addNode("properties");
n.setPrimaryType(NT_UNSTRUCTURED);
n = n.addNode("number");
n.setPrimaryType(NT_UNSTRUCTURED);
n.setProperty("propertyIndex", true);
n.setProperty("name", INDEX_PROPERTY);
session.save();
} finally {
session.logout();
}
}
@Override
protected Set<IndexEditorProvider> additionalIndexEditorProviders() {
return ImmutableSet.of((IndexEditorProvider) new LuceneIndexEditorProvider());
}
@Override
protected boolean isAsyncIndexing() {
return true;
}
private ListAppender<ILoggingEvent> subscribeAppender() {
Filter<ILoggingEvent> filter = new AsyncLogFilter();
filter.start();
ListAppender<ILoggingEvent> appender = new ListAppender<ILoggingEvent>();
appender.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
appender.setName("asynclogcollector");
appender.addFilter(filter);
appender.start();
((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger(
ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(appender);
return appender;
}
private void unsubscribe(@NotNull final Appender<ILoggingEvent> appender) {
((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger(
ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).detachAppender(appender);
}
private static class AsyncLogFilter extends Filter<ILoggingEvent> {
public static final String MESSAGE = "Unresolved conflicts in /:async";
@Override
public FilterReply decide(ILoggingEvent event) {
final IThrowableProxy tp = event.getThrowableProxy();
if (event.getLevel().isGreaterOrEqual(Level.WARN) &&
tp != null &&
tp.getMessage().contains(MESSAGE)) {
return FilterReply.ACCEPT;
} else {
return FilterReply.DENY;
}
}
}
}