blob: 1fdcce43806403d11dba28479ca70392575524dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;
import static com.google.common.collect.Sets.newHashSet;
import static org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.ASYNC;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.DISABLE_INDEXES_ON_NEXT_CYCLE;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_CONTENT_NODE_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.SUPERSEDED_INDEX_PATHS;
import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
import static org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider.TYPE;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.openmbean.CompositeData;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter;
import org.apache.jackrabbit.oak.plugins.index.TrackingCorruptIndexHandler.CorruptIndexInfo;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.plugins.memory.PropertyValues;
import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
import org.apache.jackrabbit.oak.query.index.FilterImpl;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
import org.apache.jackrabbit.oak.spi.commit.DefaultValidator;
import org.apache.jackrabbit.oak.spi.commit.Editor;
import org.apache.jackrabbit.oak.spi.commit.EditorHook;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.commit.Validator;
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.util.ISO8601;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import ch.qos.logback.classic.Level;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
public class AsyncIndexUpdateTest {
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private MetricStatisticsProvider statsProvider =
new MetricStatisticsProvider(ManagementFactory.getPlatformMBeanServer(),executor);
@After
public void shutDown(){
statsProvider.close();
new ExecutorCloser(executor).close();
}
// TODO test index config deletes
private static Set<String> find(PropertyIndexLookup lookup, String name,
String value) {
return Sets.newHashSet(lookup.query(FilterImpl.newTestInstance(), name,
PropertyValues.newString(value)));
}
private static NodeState checkPathExists(NodeState state, String... verify) {
NodeState c = state;
for (String p : verify) {
c = c.getChildNode(p);
assertTrue(c.exists());
}
return c;
}
/**
* Async Index Test
* <ul>
* <li>Add an index definition</li>
* <li>Add some content</li>
* <li>Search & verify</li>
* </ul>
*
*/
@Test
public void testAsync() throws Exception {
NodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
NodeState root = store.getRoot();
// first check that the index content nodes exist
checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
INDEX_CONTENT_NODE_NAME);
assertFalse(root.getChildNode(INDEX_DEFINITIONS_NAME).hasChildNode(
":conflict"));
PropertyIndexLookup lookup = new PropertyIndexLookup(root);
assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
}
/**
* Async Index Test with 2 index defs at the same location
* <ul>
* <li>Add an index definition</li>
* <li>Add some content</li>
* <li>Search & verify</li>
* </ul>
*
*/
@Test
public void testAsyncDouble() throws Exception {
NodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndexSecond", true, false, ImmutableSet.of("bar"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc")
.setProperty("bar", "def");
builder.child("testSecond").setProperty("bar", "ghi");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
NodeState root = store.getRoot();
// first check that the index content nodes exist
checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
INDEX_CONTENT_NODE_NAME);
checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndexSecond",
INDEX_CONTENT_NODE_NAME);
PropertyIndexLookup lookup = new PropertyIndexLookup(root);
assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
assertEquals(ImmutableSet.<String> of(), find(lookup, "foo", "def"));
assertEquals(ImmutableSet.<String> of(), find(lookup, "foo", "ghi"));
assertEquals(ImmutableSet.<String> of(), find(lookup, "bar", "abc"));
assertEquals(ImmutableSet.of("testRoot"), find(lookup, "bar", "def"));
assertEquals(ImmutableSet.of("testSecond"), find(lookup, "bar", "ghi"));
}
/**
* Async Index Test with 2 index defs at different tree locations
* <ul>
* <li>Add an index definition</li>
* <li>Add some content</li>
* <li>Search & verify</li>
* </ul>
*
*/
@Test
public void testAsyncDoubleSubtree() throws Exception {
NodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
createIndexDefinition(
builder.child("newchild").child("other")
.child(INDEX_DEFINITIONS_NAME), "subIndex", true,
false, ImmutableSet.of("foo"), null).setProperty(
ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
builder.child("newchild").child("other").child("testChild")
.setProperty("foo", "xyz");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
NodeState root = store.getRoot();
// first check that the index content nodes exist
checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
INDEX_CONTENT_NODE_NAME);
checkPathExists(root, "newchild", "other", INDEX_DEFINITIONS_NAME,
"subIndex", INDEX_CONTENT_NODE_NAME);
PropertyIndexLookup lookup = new PropertyIndexLookup(root);
assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
PropertyIndexLookup lookupChild = new PropertyIndexLookup(root
.getChildNode("newchild").getChildNode("other"));
assertEquals(ImmutableSet.of("testChild"),
find(lookupChild, "foo", "xyz"));
assertEquals(ImmutableSet.<String> of(),
find(lookupChild, "foo", "abc"));
}
@Test
public void testAsyncPause() throws Exception {
NodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.getIndexStats().pause();
async.run();
assertFalse(store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME)
.getChildNode("rootIndex")
.hasChildNode(INDEX_CONTENT_NODE_NAME));
async.getIndexStats().resume();
async.run();
NodeState root = store.getRoot();
// first check that the index content nodes exist
checkPathExists(root, INDEX_DEFINITIONS_NAME, "rootIndex",
INDEX_CONTENT_NODE_NAME);
assertFalse(root.getChildNode(INDEX_DEFINITIONS_NAME).hasChildNode(
":conflict"));
PropertyIndexLookup lookup = new PropertyIndexLookup(root);
assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
}
@Test
// this test is very sensitive to timing
// - effectively it can only be run in standalone mode
@Ignore
public void disposeWhileIndexing() throws Exception {
final Semaphore merge = new Semaphore(0);
final AtomicBoolean isDisposed = new AtomicBoolean(false);
final Semaphore disposed = new Semaphore(0);
NodeStore store = new MemoryNodeStore() {
private void checkClosed() {
if (isDisposed.get()) {
// System.out.println(" Throwing \"This NodeStore is disposed\" " + this);
throw new IllegalStateException("This NodeStore is disposed");
}
}
@NotNull
@Override
public String checkpoint(long lifetime, @NotNull Map<String, String> properties) {
checkClosed();
final String checkpoint = super.checkpoint(lifetime, properties);
// System.out.println(" Created checkpoint " + checkpoint);
return checkpoint;
}
@Override
public synchronized NodeState merge(@NotNull NodeBuilder builder, @NotNull CommitHook commitHook, @NotNull CommitInfo info)
throws CommitFailedException {
checkClosed();
try {
return super.merge(builder, commitHook, info);
} finally {
// System.out.println(" Store after merge: " + this);
if (!merge.tryAcquire()) {
isDisposed.set(true);
disposed.release();
// System.out.println(" Now the disposed flag is set (after merge)");
}
}
}
@Override
public synchronized boolean release(String checkpoint) {
// currently no checkClosed is done for DocumentNodeStore here,
// so in this test variant we're also not doing it here (to reproduce the bug).
try {
return super.release(checkpoint);
} finally {
// System.out.println(" Released checkpoint " + checkpoint);
}
}
};
merge.release(100);
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), "foo",
false, ImmutableSet.of("foo"), null, TYPE,
Collections.singletonMap(ASYNC_PROPERTY_NAME, "async"));
builder.child("test").setProperty("foo", "a");
builder.child("child");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
final AtomicInteger reindexingCount = new AtomicInteger(0);
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
provider) {
@Override
protected boolean updateIndex(NodeState before, String beforeCheckpoint, NodeState after, String afterCheckpoint,
String afterTime, AsyncUpdateCallback callback, AtomicReference<String> checkpointToRelease) throws CommitFailedException {
if (before == MISSING_NODE) {
reindexingCount.incrementAndGet();
}
return super.updateIndex(before, beforeCheckpoint, after, afterCheckpoint, afterTime, callback, checkpointToRelease);
}
};
async.setLeaseTimeOut(250);
async.run();
builder = store.getRoot().builder();
builder.child("test").setProperty("foo", "b");
builder.child("child").setProperty("prop", "value");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
merge.drainPermits();
merge.release(2);
// the first async.run() was a reindexing, but we're not counting that, so:
reindexingCount.set(0);
// System.out.println("Going for an async index run, reindex count = " + reindexingCount.get());
// with only 2 permits for the 'merge' semaphore,
// ie 2 calls to merge(), the async.run() below
// will hit the disposed NodeStore and trigger the bug.
// however, it will have released the checkpoint, as that
// is unprotected. so a subsequent call to async.run()
// will fail
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
async.run();
} catch(Throwable th) {
// th.printStackTrace();
}
}
});
t.start();
t.join(5000);
// System.out.println("Async index run done - restarting NodeStore, reindex count = " + reindexingCount.get());
// lets resume the NodeStore
merge.release(1000);
isDisposed.set(false);
// and rerun the async
// System.out.println("and going for another async index run, reindex count = " + reindexingCount.get());
async.run();
// we did crash before, so the async lease would still be valid, so we expect a failed run:
assertTrue(async.isFailing());
// let's wait until the lease is released - should be 500ms (2x lease timeout of 250ms)
// System.out.println("Looping... ");
final long timeout = System.currentTimeMillis() + 1000;
while(async.isFailing() && System.currentTimeMillis() < timeout) {
// let's be a bit CPU friendly..:
Thread.sleep(50);
// System.out.println(" Rechecking async index..., reindex count = " + reindexingCount.get());
async.run();
// System.out.println(" Result of async indexing: failing=" + async.isFailing() + " , reindex count = " + reindexingCount.get());
}
// the test timeout was hit
assertTrue(async.isFailing());
// but now should not have seen a complete reindex
assertEquals("reindex happened", 0, reindexingCount.get());
}
// OAK-1749
@Test
public void branchBaseOnCheckpoint() throws Exception {
final Semaphore retrieve = new Semaphore(1);
final Semaphore checkpoint = new Semaphore(0);
NodeStore store = new MemoryNodeStore() {
@Nullable
@Override
public NodeState retrieve(@NotNull String checkpoint) {
retrieve.acquireUninterruptibly();
try {
return super.retrieve(checkpoint);
} finally {
retrieve.release();
}
}
@NotNull
@Override
public String checkpoint(long lifetime, @NotNull Map<String, String> properties) {
try {
return super.checkpoint(lifetime, properties);
} finally {
checkpoint.release();
}
}
};
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), "foo",
false, ImmutableSet.of("foo"), null, TYPE,
Collections.singletonMap(ASYNC_PROPERTY_NAME, "async"));
builder.child("test").setProperty("foo", "a");
builder.child("child");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
provider);
async.run();
builder = store.getRoot().builder();
builder.child("test").setProperty("foo", "b");
builder.child("child").setProperty("prop", "value");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
async.run();
}
});
// drain checkpoint permits
checkpoint.acquireUninterruptibly(checkpoint.availablePermits());
// block NodeStore.retrieve()
retrieve.acquireUninterruptibly();
t.start();
// wait until async update called checkpoint
retrieve.release();
checkpoint.acquireUninterruptibly();
builder = store.getRoot().builder();
builder.child("child").remove();
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
// allow async update to proceed with NodeStore.retrieve()
retrieve.release();
t.join();
assertFalse(store.getRoot().hasChildNode("child"));
}
// OAK-1784
@Test
public void failOnConflict() throws Exception {
final Map<Thread, Semaphore> locks = Maps.newIdentityHashMap();
NodeStore store = new MemoryNodeStore() {
@NotNull
@Override
public NodeState merge(@NotNull NodeBuilder builder,
@NotNull CommitHook commitHook, @NotNull CommitInfo info)
throws CommitFailedException {
Semaphore s = locks.get(Thread.currentThread());
if (s != null) {
s.acquireUninterruptibly();
}
return super.merge(builder, commitHook, info);
}
};
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), "foo",
false, ImmutableSet.of("foo"), null, TYPE,
Collections.singletonMap(ASYNC_PROPERTY_NAME, "async"));
builder.child("test").setProperty("foo", "a");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
provider);
async.run();
builder = store.getRoot().builder();
builder.child("test").setProperty("foo", "b");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
async.run();
}
});
Semaphore s = new Semaphore(0);
locks.put(t, s);
t.start();
// make some unrelated changes to trigger indexing
builder = store.getRoot().builder();
builder.setChildNode("dummy").setProperty("foo", "bar");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
while (!s.hasQueuedThreads()) {
Thread.yield();
}
// introduce a conflict
builder = store.getRoot().builder();
builder.getChildNode(INDEX_DEFINITIONS_NAME).getChildNode("foo")
.getChildNode(":index").child("a").remove();
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
s.release(100);
t.join();
builder = store.getRoot().builder();
assertNoConflictMarker(builder);
}
private void assertNoConflictMarker(NodeBuilder builder) {
for (String name : builder.getChildNodeNames()) {
if (name.equals(ConflictAnnotatingRebaseDiff.CONFLICT)) {
fail("conflict marker detected");
}
assertNoConflictMarker(builder.getChildNode(name));
}
}
/**
* OAK-1959, stale ref to checkpoint thorws the indexer into a reindexing
* loop
*/
@Test
public void recoverFromMissingCpRef() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
new AsyncIndexUpdate("async", store, provider).run();
checkPathExists(store.getRoot(), INDEX_DEFINITIONS_NAME, "rootIndex",
INDEX_CONTENT_NODE_NAME, "abc", "testRoot");
builder = store.getRoot().builder();
// change cp ref to point to a non-existing one
builder.child(ASYNC).setProperty("async", "faulty");
builder.child("testAnother").setProperty("foo", "def");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
new AsyncIndexUpdate("async", store, provider).run();
checkPathExists(store.getRoot(), INDEX_DEFINITIONS_NAME, "rootIndex",
INDEX_CONTENT_NODE_NAME, "def", "testAnother");
}
@Test
public void cpCleanupNoChanges() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
assertTrue("Expecting no checkpoints",
store.listCheckpoints().size() == 0);
// no changes on diff, no checkpoints left behind
async.run();
assertTrue(async.isFinished());
Set<String> checkpoints = newHashSet(store.listCheckpoints());
assertTrue("Expecting the initial checkpoint",
checkpoints.size() == 1);
assertEquals(store.getRoot().getChildNode(ASYNC)
.getString("async"), checkpoints.iterator().next());
async.run();
assertEquals("Expecting no checkpoint changes",
checkpoints, store.listCheckpoints());
}
@Test
public void cpCleanupWChanges() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
assertTrue("Expecting no checkpoints",
store.listCheckpoints().size() == 0);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String firstCp = store.listCheckpoints().iterator().next();
builder = store.getRoot().builder();
builder.child("testRoot").setProperty("foo", "def");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String secondCp = store.listCheckpoints().iterator().next();
assertFalse("Store should keep only second checkpoint",
secondCp.equals(firstCp));
assertEquals(
secondCp,
store.getRoot().getChildNode(ASYNC)
.getString("async"));
}
@Test
public void cpCleanupWUnrelatedChanges() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
assertTrue("Expecting no checkpoints",
store.listCheckpoints().size() == 0);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String firstCp = store.listCheckpoints().iterator().next();
// add content that's hidden from indexing
builder = store.getRoot().builder();
builder.child("testRoot").child(":hidden");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String secondCp = store.listCheckpoints().iterator().next();
assertFalse("Store should keep only second checkpoint",
secondCp.equals(firstCp));
assertEquals(
secondCp,
store.getRoot().getChildNode(ASYNC)
.getString("async"));
}
@Test
public void cpCleanupWErrors() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
FaultyIndexEditorProvder provider = new FaultyIndexEditorProvder();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
assertTrue("Expecting no checkpoints",
store.listCheckpoints().size() == 0);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
assertTrue("Error should have been triggered by the commit",
provider.isFailed());
assertTrue("Expecting no checkpoints",
store.listCheckpoints().size() == 0);
// OAK-3054 failure reports
AsyncIndexStats stats = async.getIndexStats();
String since = stats.getFailingSince();
assertTrue(stats.isFailing());
assertEquals(1, stats.getConsecutiveFailedExecutions());
assertEquals(since, stats.getLatestErrorTime());
TimeUnit.MILLISECONDS.sleep(100);
async.run();
assertTrue(stats.isFailing());
assertEquals(2, stats.getConsecutiveFailedExecutions());
assertEquals(since, stats.getFailingSince());
assertNotEquals(since, stats.getLatestErrorTime());
stats.fixed();
assertFalse(stats.isFailing());
assertEquals(0, stats.getConsecutiveFailedExecutions());
assertEquals("", stats.getFailingSince());
}
@Test
public void cpCleanupNoRelease() throws Exception {
final MemoryNodeStore mns = new MemoryNodeStore();
final AtomicBoolean canRelease = new AtomicBoolean(false);
ProxyNodeStore store = new ProxyNodeStore() {
@Override
protected NodeStore getNodeStore() {
return mns;
}
@Override
public boolean release(String checkpoint) {
if (canRelease.get()) {
return super.release(checkpoint);
}
return false;
}
};
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
assertTrue("Expecting no checkpoints",
mns.listCheckpoints().size() == 0);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
assertTrue("Expecting one checkpoint",
mns.listCheckpoints().size() == 1);
assertTrue(
"Expecting one temp checkpoint",
newHashSet(
store.getRoot().getChildNode(ASYNC)
.getStrings("async-temp")).size() == 1);
builder = store.getRoot().builder();
builder.child("testRoot").setProperty("foo", "def");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
assertTrue("Expecting two checkpoints",
mns.listCheckpoints().size() == 2);
assertTrue(
"Expecting two temp checkpoints",
newHashSet(
store.getRoot().getChildNode(ASYNC)
.getStrings("async-temp")).size() == 2);
canRelease.set(true);
builder = store.getRoot().builder();
builder.child("testRoot").setProperty("foo", "ghi");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
assertTrue("Expecting one checkpoint",
mns.listCheckpoints().size() == 1);
String secondCp = mns.listCheckpoints().iterator().next();
assertEquals(
secondCp,
store.getRoot().getChildNode(ASYNC)
.getString("async"));
// the temp cps size is 2 now but the unreferenced checkpoints have been
// cleared from the store already
for (String cp : store.getRoot().getChildNode(ASYNC)
.getStrings("async-temp")) {
if (cp.equals(secondCp)) {
continue;
}
assertNull("Temp checkpoint was already cleared from store",
store.retrieve(cp));
}
}
// OAK-4826
@Test
public void cpCleanupOrphaned() throws Exception {
Clock clock = Clock.SIMPLE;
MemoryNodeStore store = new MemoryNodeStore();
// prepare index and initial content
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
assertTrue("Expecting no checkpoints",
store.listCheckpoints().size() == 0);
IndexEditorProvider provider = new PropertyIndexEditorProvider();
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String cp = store.listCheckpoints().iterator().next();
Map<String, String> info = store.checkpointInfo(cp);
builder = store.getRoot().builder();
builder.child("testRoot").setProperty("foo", "def");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
// wait until currentTimeMillis() changes. this ensures
// the created value for the checkpoint is different
// from the previous checkpoint.
clock.waitUntil(clock.getTime() + 1);
async.run();
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
cp = store.listCheckpoints().iterator().next();
// create a new checkpoint with the info from the first checkpoint
// this simulates an orphaned checkpoint that should be cleaned up.
// the created timestamp is set back in time because cleanup preserves
// checkpoints within the lease time frame.
Calendar c = Calendar.getInstance();
c.setTimeInMillis(clock.getTime() - 2 * async.getLeaseTimeOut());
info.put("created", ISO8601.format(c));
assertNotNull(store.checkpoint(TimeUnit.HOURS.toMillis(1), info));
assertTrue("Expecting two checkpoints",
store.listCheckpoints().size() == 2);
async.cleanUpCheckpoints();
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
assertEquals(cp, store.listCheckpoints().iterator().next());
}
@Test
public void disableCheckpointCleanup() throws Exception {
String propertyName = "oak.async.checkpointCleanupIntervalMinutes";
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
try {
System.setProperty(propertyName, "-1");
final AtomicBoolean cleaned = new AtomicBoolean();
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
@Override
void cleanUpCheckpoints() {
cleaned.set(true);
super.cleanUpCheckpoints();
}
};
async.run();
assertFalse(cleaned.get());
} finally {
System.clearProperty(propertyName);
}
}
/**
* OAK-2203 Test reindex behavior on an async index when the index provider is missing
* for a given type
*/
@Test
public void testReindexMissingProvider() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
String missingAsync = "missing-async";
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, missingAsync);
builder.child("testRoot").setProperty("foo", "abc");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate(missingAsync, store,
provider);
//first run, creates a checkpoint and a ref to it as the last indexed state
async.run();
assertFalse(async.isFailing());
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String firstCp = store.listCheckpoints().iterator().next();
assertEquals(
firstCp,
store.getRoot().getChildNode(ASYNC)
.getString(missingAsync));
// second run, simulate an index going away
provider = CompositeIndexEditorProvider
.compose(new ArrayList<IndexEditorProvider>());
async = new AsyncIndexUpdate(missingAsync, store, provider);
async.run();
assertTrue(async.isFailing());
// don't set reindex=true but skip the update
PropertyState reindex = store.getRoot()
.getChildNode(INDEX_DEFINITIONS_NAME).getChildNode("rootIndex")
.getProperty(REINDEX_PROPERTY_NAME);
assertTrue(reindex == null || !reindex.getValue(Type.BOOLEAN));
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String secondCp = store.listCheckpoints().iterator().next();
assertTrue("Store should not create a new checkpoint",
secondCp.equals(firstCp));
assertEquals(
firstCp,
store.getRoot().getChildNode(ASYNC)
.getString(missingAsync));
}
@Test
public void testReindexMissingProvider_NonRoot() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
String missingAsyncName = "missing-async";
createIndexDefinition(builder.child("subNodeIndex").child(INDEX_DEFINITIONS_NAME),
"rootIndex2", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, missingAsyncName);
builder.child("subNodeIndex").child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate(missingAsyncName, store, provider);
//first run, creates a checkpoint and a ref to it as the last indexed state
async.run();
assertFalse(async.isFailing());
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String firstCp = store.listCheckpoints().iterator().next();
assertEquals(firstCp, store.getRoot().getChildNode(ASYNC).getString(missingAsyncName));
builder = store.getRoot().builder();
builder.child("subNodeIndex").child("testRoot2").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
// second run, simulate an index going away
provider = CompositeIndexEditorProvider.compose(new ArrayList<IndexEditorProvider>());
async = new AsyncIndexUpdate(missingAsyncName, store, provider);
async.run();
assertTrue(async.isFailing());
// don't set reindex=true but skip the update
NodeState rootIndex2 = NodeStateUtils.getNode(store.getRoot(), "/subNodeIndex/oak:index/rootIndex2");
assertTrue(rootIndex2.exists());
PropertyState reindex2 = rootIndex2.getProperty(REINDEX_PROPERTY_NAME);
assertTrue(reindex2 == null || !reindex2.getValue(Type.BOOLEAN));
assertTrue("Expecting one checkpoint",store.listCheckpoints().size() == 1);
String secondCp = store.listCheckpoints().iterator().next();
assertTrue("Store should not create a new checkpoint", secondCp.equals(firstCp));
assertEquals(firstCp, store.getRoot().getChildNode(ASYNC).getString(missingAsyncName));
}
private static class FaultyIndexEditorProvder implements
IndexEditorProvider {
private final FaultyIndexEditor faulty = new FaultyIndexEditor();
@Override
public Editor getIndexEditor(@NotNull String type, @NotNull NodeBuilder definition,
@NotNull NodeState root, @NotNull IndexUpdateCallback callback)
throws CommitFailedException {
return faulty;
}
public boolean isFailed() {
return faulty.failed;
}
}
private static class FaultyIndexEditor implements IndexEditor {
private boolean failed = false;
@Override
public void enter(NodeState before, NodeState after)
throws CommitFailedException {
failed = true;
throw new CommitFailedException("test", -1, "Testing failures");
}
@Override
public void leave(NodeState before, NodeState after)
throws CommitFailedException {
}
@Override
public void propertyAdded(PropertyState after)
throws CommitFailedException {
}
@Override
public void propertyChanged(PropertyState before, PropertyState after)
throws CommitFailedException {
}
@Override
public void propertyDeleted(PropertyState before)
throws CommitFailedException {
}
@Override
public Editor childNodeAdded(String name, NodeState after)
throws CommitFailedException {
return null;
}
@Override
public Editor childNodeChanged(String name, NodeState before,
NodeState after) throws CommitFailedException {
return null;
}
@Override
public Editor childNodeDeleted(String name, NodeState before)
throws CommitFailedException {
return null;
}
}
@Test
public void taskSplit() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"changedIndex", true, false, ImmutableSet.of("bar"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"ignored1", true, false, ImmutableSet.of("baz"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async-ignored");
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"ignored2", true, false, ImmutableSet.of("etc"), null);
builder.child("testRoot").setProperty("foo", "abc");
builder.child("testRoot").setProperty("bar", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
assertTrue("Expecting no checkpoints",
store.listCheckpoints().size() == 0);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String firstCp = store.listCheckpoints().iterator().next();
builder = store.getRoot().builder();
builder.child("testRoot").setProperty("foo", "def");
builder.child("testRoot").setProperty("bar", "def");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
IndexTaskSpliter splitter = async.getTaskSplitter();
splitter.registerSplit(newHashSet("/oak:index/changedIndex"), "async-slow");
async.run();
Set<String> checkpoints = newHashSet(store.listCheckpoints());
assertTrue("Expecting two checkpoints",
checkpoints.size() == 2);
assertTrue(checkpoints.remove(firstCp));
String secondCp = checkpoints.iterator().next();
NodeState asyncNode = store.getRoot().getChildNode(
ASYNC);
assertEquals(firstCp, asyncNode.getString("async-slow"));
assertEquals(secondCp, asyncNode.getString("async"));
assertFalse(newHashSet(asyncNode.getStrings("async-temp")).contains(
firstCp));
NodeState indexNode = store.getRoot().getChildNode(
INDEX_DEFINITIONS_NAME);
assertEquals("async",
indexNode.getChildNode("rootIndex").getString("async"));
assertEquals("async-ignored", indexNode.getChildNode("ignored1")
.getString("async"));
assertNull(indexNode.getChildNode("ignored2").getString("async"));
assertEquals("async-slow", indexNode.getChildNode("changedIndex")
.getString("async"));
assertEquals(false,
indexNode.getChildNode("changedIndex").getBoolean("reindex"));
// new index task is on previous checkpoint
PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
assertEquals(ImmutableSet.of("testRoot"), find(lookup, "bar", "abc"));
assertEquals(ImmutableSet.of(), find(lookup, "bar", "def"));
}
@Test
public void taskSplitNoMatch() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"ignored", true, false, ImmutableSet.of("baz"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async-ignored");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
assertTrue("Expecting no checkpoints",
store.listCheckpoints().size() == 0);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
assertTrue("Expecting one checkpoint",
store.listCheckpoints().size() == 1);
String firstCp = store.listCheckpoints().iterator().next();
builder = store.getRoot().builder();
builder.child("testRoot").setProperty("foo", "def");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
IndexTaskSpliter splitter = async.getTaskSplitter();
// no match on the provided path
splitter.registerSplit(newHashSet("/oak:index/ignored"), "async-slow");
async.run();
Set<String> checkpoints = newHashSet(store.listCheckpoints());
assertTrue("Expecting a single checkpoint",
checkpoints.size() == 1);
String secondCp = checkpoints.iterator().next();
NodeState asyncNode = store.getRoot().getChildNode(
ASYNC);
assertEquals(secondCp, asyncNode.getString("async"));
assertNull(firstCp, asyncNode.getString("async-slow"));
}
@Test
public void testAsyncExecutionStats() throws Exception {
final Set<String> knownCheckpoints = Sets.newHashSet();
MemoryNodeStore store = new MemoryNodeStore(){
@Override
public synchronized NodeState retrieve(@NotNull String checkpoint) {
if (!knownCheckpoints.isEmpty() && !knownCheckpoints.contains(checkpoint)){
return null;
}
return super.retrieve(checkpoint);
}
};
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider, statsProvider, false);
runOneCycle(async);
assertEquals(1, async.getIndexStats().getExecutionStats().getExecutionCounter().getCount());
//Run a cycle so that change of reindex flag gets indexed
runOneCycle(async);
assertEquals(2, async.getIndexStats().getExecutionStats().getExecutionCounter().getCount());
long indexedNodeCount = async.getIndexStats().getExecutionStats().getIndexedNodeCount().getCount();
//Now run so that it results in an empty cycle
runOneCycle(async);
assertEquals(indexedNodeCount, async.getIndexStats().getExecutionStats().getIndexedNodeCount().getCount());
//Do some updates and counter should increase
builder = store.getRoot().builder();
builder.child("testRoot2").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
runOneCycle(async);
assertEquals(4, async.getIndexStats().getExecutionStats().getExecutionCounter().getCount());
//Do some updates but disable checkpoints. Counter should not increase
builder = store.getRoot().builder();
builder.child("testRoot3").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
//Disable new checkpoint retrieval
knownCheckpoints.addAll(store.listCheckpoints());
runOneCycle(async);
assertEquals(0, lastExecutionStats(async.getIndexStats().getExecutionCount()));
}
@Test
public void executionCountUpdatesOnRunWithoutAnyChangeInRepo() throws Exception {
AsyncIndexUpdate async = new AsyncIndexUpdate("async",
new MemoryNodeStore(),
new PropertyIndexEditorProvider(),
statsProvider, false);
long execCnt1 = async.getIndexStats().getTotalExecutionCount();
runOneCycle(async);
long execCnt2 = async.getIndexStats().getTotalExecutionCount();
runOneCycle(async);
long execCnt3 = async.getIndexStats().getTotalExecutionCount();
assertNotEquals("execCnt1 " + execCnt1 + " and execCnt2 " + execCnt2 + " are same", execCnt1, execCnt2);
assertNotEquals("execCnt2 " + execCnt2 + " and execCnt3 " + execCnt3 + " are same", execCnt2, execCnt3);
}
private static long lastExecutionStats(CompositeData cd){
//Last stat is the last entry in the array
return ((long[]) cd.get("per second"))[59];
}
private static void runOneCycle(AsyncIndexUpdate async){
async.run();
}
@Test
public void checkpointStability() throws Exception{
NodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
//Initial indexing
async.run();
//Now checkpoints = [checkpoints0]
//Index again so as to get change in reindex flag done
async.run();
//Now checkpoints = [checkpoints1]. checkpoints0 released
//Now make some changes to
builder = store.getRoot().builder();
builder.child("testRoot2").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
//Now checkpoints = [checkpoints1]. Note that size is 1 so new checkpoint name remains same
LogCustomizer customLogs = LogCustomizer.forLogger(AsyncIndexUpdate.class.getName())
.filter(Level.WARN)
.create();
builder = store.getRoot().builder();
builder.child("testRoot3").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
customLogs.starting();
async.run();
assertEquals(Collections.emptyList(), customLogs.getLogs());
customLogs.finished();
}
@Test
public void noRunWhenClosed() throws Exception{
NodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
async.close();
LogCustomizer lc = createLogCustomizer(Level.WARN);
async.run();
assertEquals(1, lc.getLogs().size());
assertThat(lc.getLogs().get(0), containsString("Could not acquire run permit"));
lc.finished();
async.close();
}
@Test
public void closeWithSoftLimit() throws Exception{
NodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
final Semaphore asyncLock = new Semaphore(1);
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
@Override
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
String beforeCheckpoint,
AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
try {
asyncLock.acquire();
} catch (InterruptedException ignore) {
}
return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint,
indexStats, stopFlag);
}
};
async.setCloseTimeOut(1000);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
async.run();
}
});
Thread closer = new Thread(new Runnable() {
@Override
public void run() {
async.close();
}
});
asyncLock.acquire();
t.start();
//Wait till async gets to wait state i.e. inside run
while(!asyncLock.hasQueuedThreads());
LogCustomizer lc = createLogCustomizer(Level.DEBUG);
closer.start();
//Wait till closer is in waiting state
while(!async.isClosing());
//For softLimit case the flag should not be set
assertFalse(async.isClosed());
assertLogPhrase(lc.getLogs(), "[WAITING]");
//Let indexing run complete now
asyncLock.release();
//Wait for both threads
t.join();
closer.join();
//Close call should complete
assertLogPhrase(lc.getLogs(), "[CLOSED OK]");
}
@Test
public void closeWithHardLimit() throws Exception{
NodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
final Semaphore asyncLock = new Semaphore(1);
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
@Override
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
String beforeCheckpoint,
AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
try {
asyncLock.acquire();
} catch (InterruptedException ignore) {
}
return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint,
indexStats, stopFlag);
}
};
//Set a 1 sec close timeout
async.setCloseTimeOut(1);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
async.run();
}
});
Thread closer = new Thread(new Runnable() {
@Override
public void run() {
async.close();
}
});
//Lock to ensure that AsyncIndexUpdate waits
asyncLock.acquire();
t.start();
//Wait till async gets to wait state i.e. inside run
while(!asyncLock.hasQueuedThreads());
LogCustomizer lc = createLogCustomizer(Level.DEBUG);
closer.start();
//Wait till stopFlag is set
while(!async.isClosed());
assertLogPhrase(lc.getLogs(), "[SOFT LIMIT HIT]");
//Let indexing run complete now
asyncLock.release();
//Wait for both threads
t.join();
//Async run would have exited with log message logged
assertLogPhrase(lc.getLogs(), "The index update interrupted");
//Wait for close call to complete
closer.join();
lc.finished();
}
@Test
public void abortedRun() throws Exception{
NodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
final Semaphore asyncLock = new Semaphore(1);
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
@Override
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
String beforeCheckpoint,
AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint,
indexStats, stopFlag){
@Override
public void indexUpdate() throws CommitFailedException {
try {
asyncLock.acquire();
} catch (InterruptedException ignore) {
}
try {
super.indexUpdate();
}finally {
asyncLock.release();
}
}
};
}
};
runOneCycle(async);
assertEquals(IndexStatsMBean.STATUS_DONE, async.getIndexStats().getStatus());
//Below we ensure that we interrupt while the indexing is in progress
//hence the use of asyncLock which ensures the abort is called at right time
//Now make some changes to
builder = store.getRoot().builder();
builder.child("testRoot2").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
Thread t = new Thread(async);
//Lock to ensure that AsyncIndexUpdate waits
asyncLock.acquire();
t.start();
//Wait till async gets to wait state i.e. inside run
while(!asyncLock.hasQueuedThreads());
assertEquals(IndexStatsMBean.STATUS_RUNNING, async.getIndexStats().getStatus());
assertThat(async.getIndexStats().abortAndPause(), containsString("Abort request placed"));
asyncLock.release();
retry(5, 5, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return IndexStatsMBean.STATUS_INTERRUPTED.equals(async.getIndexStats().getStatus());
}
});
//Post abort indexing should be fine
runOneCycle(async);
assertTrue(async.getIndexStats().isPaused());
//Now resume indexing
async.getIndexStats().resume();
runOneCycle(async);
assertEquals(IndexStatsMBean.STATUS_DONE, async.getIndexStats().getStatus());
assertFalse(async.isClosed());
}
private void assertLogPhrase(List<String> logs, String logPhrase){
assertThat(logs.toString(), containsString(logPhrase));
}
private static LogCustomizer createLogCustomizer(Level level){
LogCustomizer lc = LogCustomizer.forLogger(AsyncIndexUpdate.class.getName())
.filter(level)
.enable(level)
.create();
lc.starting();
return lc;
}
private static void retry(int timeoutSeconds, int intervalBetweenTriesMsec, Callable<Boolean> c) {
long timeout = System.currentTimeMillis() + timeoutSeconds * 1000L;
while (System.currentTimeMillis() < timeout) {
try {
if (c.call()) {
return;
}
} catch (Exception ignore) {
}
try {
Thread.sleep(intervalBetweenTriesMsec);
} catch (InterruptedException ignore) {
}
}
fail("RetryLoop failed, condition is false after " + timeoutSeconds + " seconds: ");
}
@Test
public void greedyLeaseReindex() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate pre = new AsyncIndexUpdate("async", store, provider);
pre.run();
pre.close();
// rm all cps to simulate 'missing cp scenario'
for (String cp : store.listCheckpoints()) {
store.release(cp);
}
final AtomicBoolean greedyLease = new AtomicBoolean(false);
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
provider) {
@Override
protected AsyncUpdateCallback newAsyncUpdateCallback(
NodeStore store, String name, long leaseTimeOut,
String beforeCheckpoint, AsyncIndexStats indexStats,
AtomicBoolean stopFlag) {
return new AsyncUpdateCallback(store, name, leaseTimeOut,
beforeCheckpoint, indexStats, stopFlag) {
@Override
protected void initLease() throws CommitFailedException {
greedyLease.set(true);
super.initLease();
}
@Override
protected void prepare(String afterCheckpoint)
throws CommitFailedException {
assertTrue(greedyLease.get());
super.prepare(afterCheckpoint);
}
};
}
};
async.run();
async.close();
assertTrue(greedyLease.get());
}
@Test
public void checkpointLostEventualConsistent() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
final List<NodeState> rootStates = Lists.newArrayList();
store.addObserver(new Observer() {
@Override
public void contentChanged(@NotNull NodeState root, @Nullable CommitInfo info) {
rootStates.add(root);
}
});
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate pre = new AsyncIndexUpdate("async", store, provider);
pre.run();
//Create another commit so that we have two checkpoints
builder = store.getRoot().builder();
builder.child("testRoot2").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
pre.run();
pre.close();
//Look for the nodestate just before the final merge in AsyncIndexUpdate
//i.e. where older checkpoint was still referred and which has been "released"
//post last run
Collections.reverse(rootStates);
final AtomicReference<NodeState> oldRootState = new AtomicReference<NodeState>();
for (NodeState ns : rootStates) {
NodeState async = ns.getChildNode(ASYNC);
String checkpointName = async.getString("async");
if (store.retrieve(checkpointName) == null &&
async.getProperty(AsyncIndexUpdate.leasify("async")) == null){
oldRootState.set(ns);
break;
}
}
assertNotNull(oldRootState.get());
final AtomicBoolean intiLeaseCalled = new AtomicBoolean(false);
//Here for the call to read existing NodeState we would return the old
//"stale" state where we have a stale checkpoint
store = new MemoryNodeStore(store.getRoot()) {
@Override
public NodeState getRoot() {
//Keep returning stale view untill initlease is not invoked
if (!intiLeaseCalled.get()) {
return oldRootState.get();
}
return super.getRoot();
}
};
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider){
@Override
protected AsyncUpdateCallback newAsyncUpdateCallback(
NodeStore store, String name, long leaseTimeOut,
String beforeCheckpoint, AsyncIndexStats indexStats,
AtomicBoolean stopFlag) {
return new AsyncUpdateCallback(store, name, leaseTimeOut,
beforeCheckpoint, indexStats, stopFlag) {
@Override
protected void initLease() throws CommitFailedException {
intiLeaseCalled.set(true);
super.initLease();
}
};
}
};
async.run();
//This run should fail
assertTrue(async.getIndexStats().isFailing());
async.close();
}
@Test
public void commitContext() throws Exception{
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
CommitInfoCollector infoCollector = new CommitInfoCollector();
store.addObserver(infoCollector);
async.run();
assertFalse(infoCollector.infos.isEmpty());
assertNotNull(infoCollector.infos.get(0).getInfo().get(CommitContext.NAME));
}
@Test
public void validatorProviderInvocation() throws Exception{
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
CollectingValidatorProvider v = new CollectingValidatorProvider();
async.setValidatorProviders(ImmutableList.<ValidatorProvider>of(v));
async.run();
assertFalse(v.visitedPaths.isEmpty());
assertThat(v.visitedPaths, hasItem("/:async"));
assertThat(v.visitedPaths, hasItem("/oak:index/rootIndex"));
}
@Test
public void longTimeFailingIndexMarkedAsCorrupt() throws Exception{
MemoryNodeStore store = new MemoryNodeStore();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"fooIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"barIndex", true, false, ImmutableSet.of("bar"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot1").setProperty("foo", "abc");
builder.child("testRoot2").setProperty("bar", "abc");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
TestIndexEditorProvider provider = new TestIndexEditorProvider();
Clock clock = new Clock.Virtual();
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.getCorruptIndexHandler().setClock(clock);
async.run();
//1. Basic sanity check. Indexing works
PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
assertEquals(ImmutableSet.of("testRoot1"), find(lookup, "foo", "abc"));
assertEquals(ImmutableSet.of("testRoot2"), find(lookup, "bar", "abc"));
//2. Add some new content
builder = store.getRoot().builder();
builder.child("testRoot3").setProperty("foo", "xyz");
builder.child("testRoot4").setProperty("bar", "xyz");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
//3. Now fail the indexing for 'bar'
provider.enableFailureMode("/oak:index/barIndex");
async.run();
assertTrue(async.getIndexStats().isFailing());
//barIndex is failing but not yet considered corrupted
assertTrue(async.getCorruptIndexHandler().getFailingIndexData("async").containsKey("/oak:index/barIndex"));
assertFalse(async.getCorruptIndexHandler().getCorruptIndexData("async").containsKey("/oak:index/barIndex"));
CorruptIndexInfo barIndexInfo = async.getCorruptIndexHandler().getFailingIndexData("async").get("/oak:index/barIndex");
//fooIndex is fine
assertFalse(async.getCorruptIndexHandler().getFailingIndexData("async").containsKey("/oak:index/fooIndex"));
//lookup should also fail as indexing failed
lookup = new PropertyIndexLookup(store.getRoot());
assertTrue(find(lookup, "foo", "xyz").isEmpty());
assertTrue(find(lookup, "bar", "xyz").isEmpty());
//4.Now move the clock forward and let the failing index marked as corrupt
clock.waitUntil(clock.getTime() + async.getCorruptIndexHandler().getCorruptIntervalMillis() + 1);
//5. Let async run again
async.run();
//Indexing would be considered as failing
assertTrue(async.getIndexStats().isFailing());
assertEquals(IndexStatsMBean.STATUS_FAILING, async.getIndexStats().getStatus());
//barIndex should be considered corrupt now
assertTrue(async.getCorruptIndexHandler().getCorruptIndexData("async").containsKey("/oak:index/barIndex"));
lookup = new PropertyIndexLookup(store.getRoot());
//fooIndex should now report updated result. barIndex would fail
assertEquals(ImmutableSet.of("testRoot3"), find(lookup, "foo", "xyz"));
assertTrue(find(lookup, "bar", "xyz").isEmpty());
assertEquals(1, barIndexInfo.getSkippedCount());
//6. Index some stuff
builder = store.getRoot().builder();
builder.child("testRoot5").setProperty("foo", "pqr");
builder.child("testRoot6").setProperty("bar", "pqr");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
assertTrue(async.getIndexStats().isFailing());
//barIndex should be skipped
assertEquals(2, barIndexInfo.getSkippedCount());
//7. Lets reindex barIndex and ensure index is not misbehaving
provider.disableFailureMode();
builder = store.getRoot().builder();
builder.child("oak:index").child("barIndex").setProperty("reindex", true);
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
//now barIndex should not be part of failing index
assertFalse(async.getCorruptIndexHandler().getFailingIndexData("async").containsKey("/oak:index/barIndex"));
}
@Test
public void validName() throws Exception{
assertNotNull(AsyncIndexUpdate.checkValidName("async"));
assertNotNull(AsyncIndexUpdate.checkValidName("foo-async"));
assertNotNull(AsyncIndexUpdate.checkValidName(IndexConstants.ASYNC_REINDEX_VALUE));
try{
AsyncIndexUpdate.checkValidName(null);
fail();
} catch (Exception ignore){
}
try{
AsyncIndexUpdate.checkValidName("foo");
fail();
} catch (Exception ignore){
}
}
@Test
public void traversalCount() throws Exception{
MemoryNodeStore store = new MemoryNodeStore();
PropertyIndexEditorProvider provider = new PropertyIndexEditorProvider();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"rootIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
//Get rid of changes in index nodes i.e. /oak:index/rootIndex
async.run();
//Do a run without any index property change
builder = store.getRoot().builder();
builder.child("a").child("b");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
AsyncIndexStats stats = async.getIndexStats();
assertEquals(3, stats.getNodesReadCount());
assertEquals(0, stats.getUpdates());
//Do a run with a index property change
builder = store.getRoot().builder();
builder.child("a").child("b").setProperty("foo", "bar");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
stats = async.getIndexStats();
assertEquals(3, stats.getNodesReadCount());
assertEquals(1, stats.getUpdates());
}
@Test
public void startTimePresentInCommitInfo() throws Exception{
MemoryNodeStore store = new MemoryNodeStore();
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"fooIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("testRoot1").setProperty("foo", "abc");
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
IndexingContextCapturingProvider provider = new IndexingContextCapturingProvider();
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
assertNotNull(provider.lastIndexingContext);
CommitInfo info = provider.lastIndexingContext.getCommitInfo();
String indexStartTime = (String) info.getInfo().get(IndexConstants.CHECKPOINT_CREATION_TIME);
assertNotNull(indexStartTime);
}
// OAK-6864
@Test
public void disableSupersededIndex() throws Exception {
IndexEditorProvider propIdxEditorProvider = new PropertyIndexEditorProvider();
EditorHook propIdxHook = new EditorHook(new IndexUpdateProvider(propIdxEditorProvider));
MemoryNodeStore store = new MemoryNodeStore();
String supersededIndexName = "supersededIndex";
String supersedingIndexName = "supersedingIndex";
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, propIdxEditorProvider);
// Create superseded index def and merge it
NodeBuilder builder = store.getRoot().builder();
NodeBuilder oakIndex = builder.child(INDEX_DEFINITIONS_NAME);
createIndexDefinition(oakIndex, supersededIndexName, true, false, ImmutableSet.of("foo"), null);
store.merge(builder, propIdxHook, CommitInfo.EMPTY);
// Create superseding index def and merge it
builder = store.getRoot().builder();
oakIndex = builder.child(INDEX_DEFINITIONS_NAME);
createIndexDefinition(oakIndex, supersedingIndexName, true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, ImmutableSet.of("async", "nrt"), Type.STRINGS)
.setProperty(SUPERSEDED_INDEX_PATHS, INDEX_DEFINITIONS_NAME + "/" + supersededIndexName)
;
store.merge(builder, propIdxHook, CommitInfo.EMPTY);
// add a change and index it thought superseded index
builder = store.getRoot().builder();
builder.child("testNode1").setProperty("foo", "bar");
store.merge(builder, propIdxHook, CommitInfo.EMPTY);
// verify state
NodeState supersededIndex = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME).getChildNode(supersededIndexName);
assertEquals("Index disabled too early", "property", supersededIndex.getString("type"));
assertFalse("Don't set :disableIndexesOnNextCycle on superseded index",
supersededIndex.hasProperty(DISABLE_INDEXES_ON_NEXT_CYCLE));
NodeState supersedingIndex = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME).getChildNode(supersedingIndexName);
assertFalse("Don't set :disableIndexesOnNextCycle on superseding index just yet",
supersedingIndex.hasProperty(DISABLE_INDEXES_ON_NEXT_CYCLE));
// do an async run - this should reindex the superseding index
async.run();
// verify state
supersededIndex = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME).getChildNode(supersededIndexName);
assertEquals("Index disabled too early", "property", supersededIndex.getString("type"));
assertFalse("Don't set :disableIndexesOnNextCycle on superseded index",
supersededIndex.hasProperty(DISABLE_INDEXES_ON_NEXT_CYCLE));
supersedingIndex = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME).getChildNode(supersedingIndexName);
assertTrue(":disableIndexesOnNextCycle not set on superseding index after reindexing run",
supersedingIndex.hasProperty(DISABLE_INDEXES_ON_NEXT_CYCLE));
// add another change and index it thought superseded index
builder = store.getRoot().builder();
store.getRoot().builder().child("testNode2").setProperty("foo", "bar");
store.merge(builder, propIdxHook, CommitInfo.EMPTY);
// verify state
supersededIndex = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME).getChildNode(supersededIndexName);
assertEquals("Index disabled too early", "property", supersededIndex.getString("type"));
assertFalse("Don't set :disableIndexesOnNextCycle on superseded index",
supersededIndex.hasProperty(DISABLE_INDEXES_ON_NEXT_CYCLE));
supersedingIndex = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME).getChildNode(supersedingIndexName);
assertTrue(":disableIndexesOnNextCycle not set on superseding index after reindexing run",
supersedingIndex.getBoolean(DISABLE_INDEXES_ON_NEXT_CYCLE));
// do another async run - indexes should get disabled now
async.run();
// verify state
supersededIndex = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME).getChildNode(supersededIndexName);
assertEquals("Index yet not disabled", "disabled", supersededIndex.getString("type"));
assertFalse("Don't set :disableIndexesOnNextCycle on superseded index",
supersededIndex.hasProperty(DISABLE_INDEXES_ON_NEXT_CYCLE));
supersedingIndex = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME).getChildNode(supersedingIndexName);
assertFalse("Don't keep :disableIndexesOnNextCycle on superseding index after disabling",
supersedingIndex.hasProperty(DISABLE_INDEXES_ON_NEXT_CYCLE));
}
@Test
public void indexCommitCallback() throws Exception {
AtomicBoolean gotFailedCommit = new AtomicBoolean();
AtomicBoolean gotSuccessfulCommit = new AtomicBoolean();
AtomicBoolean shouldFail = new AtomicBoolean();
MemoryNodeStore store = new MemoryNodeStore();
AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, (type, definition, root, callback) -> {
IndexingContext indexingContext = ((ContextAwareCallback)callback).getIndexingContext();
indexingContext.registerIndexCommitCallback(indexProgress -> {
switch (indexProgress) {
case COMMIT_FAILED:
gotFailedCommit.set(true);
break;
case COMMIT_SUCCEDED:
gotSuccessfulCommit.set(true);
break;
}
});
if (shouldFail.get()) {
throw new CommitFailedException("indexer-fail", 1, "Explicitly failing while indexing");
}
return new DefaultEditor();
});
// Make index
NodeBuilder builder = store.getRoot().builder();
builder.child("oak:index").child("fooIndex").setProperty("async", "async").setProperty("type", "foo");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
// reset stats
gotFailedCommit.set(false);
gotSuccessfulCommit.set(false);
// make some change which should succeed
builder = store.getRoot().builder();
builder.setProperty("foo", "bar");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
assertTrue("Successful indexing commit must report success", gotSuccessfulCommit.get());
assertFalse("Successful indexing commit must not report failure", gotFailedCommit.get());
// reset stats
gotFailedCommit.set(false);
gotSuccessfulCommit.set(false);
// make another change that should fail
shouldFail.set(true);
builder = store.getRoot().builder();
builder.setProperty("foo", "bar1");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
async.run();
assertFalse("Failing indexing must not report success", gotSuccessfulCommit.get());
assertTrue("Failing indexing must report failure", gotFailedCommit.get());
}
private static class TestIndexEditorProvider extends PropertyIndexEditorProvider {
private String indexPathToFail;
@Override
public Editor getIndexEditor(@NotNull String type, @NotNull NodeBuilder definition, @NotNull NodeState root,
@NotNull IndexUpdateCallback callback) {
IndexingContext context = ((ContextAwareCallback)callback).getIndexingContext();
if (indexPathToFail != null && indexPathToFail.equals(context.getIndexPath())){
RuntimeException e = new RuntimeException();
context.indexUpdateFailed(e);
throw e;
}
return super.getIndexEditor(type, definition, root, callback);
}
public void enableFailureMode(String indexPathToFail){
this.indexPathToFail = indexPathToFail;
}
public void disableFailureMode(){
indexPathToFail = null;
}
}
private static class IndexingContextCapturingProvider extends PropertyIndexEditorProvider {
IndexingContext lastIndexingContext;
@Override
public Editor getIndexEditor(@NotNull String type, @NotNull NodeBuilder definition, @NotNull NodeState root,
@NotNull IndexUpdateCallback callback) {
lastIndexingContext = ((ContextAwareCallback)callback).getIndexingContext();
return super.getIndexEditor(type, definition, root, callback);
}
}
private static class CollectingValidatorProvider extends ValidatorProvider {
final Set<String> visitedPaths = Sets.newHashSet();
@Override
protected Validator getRootValidator(NodeState before, NodeState after, CommitInfo info) {
return new CollectingValidator("/");
}
public void reset(){
visitedPaths.clear();
}
private class CollectingValidator extends DefaultValidator {
private final String path;
public CollectingValidator(String path){
this.path = path;
}
@Override
public void enter(NodeState before, NodeState after) throws CommitFailedException {
visitedPaths.add(path);
super.enter(before, after);
}
@Override
public Validator childNodeAdded(String name, NodeState after) throws CommitFailedException {
return new CollectingValidator(PathUtils.concat(path, name));
}
@Override
public Validator childNodeChanged(String name, NodeState before, NodeState after) throws CommitFailedException {
return new CollectingValidator(PathUtils.concat(path, name));
}
@Override
public Validator childNodeDeleted(String name, NodeState before) throws CommitFailedException {
return new CollectingValidator(PathUtils.concat(path, name));
}
}
}
static class CommitInfoCollector implements Observer {
List<CommitInfo> infos = Lists.newArrayList();
@Override
public void contentChanged(@NotNull NodeState root, @NotNull CommitInfo info) {
if (info != CommitInfo.EMPTY_EXTERNAL){
infos.add(info);
}
}
void reset(){
infos.clear();
}
}
}