OAK-9350 Index update: release the correct checkpoint
git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1886459 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
index 66ce53c..75dbb5f 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
@@ -38,6 +38,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -574,7 +575,7 @@
return;
}
- String checkpointToRelease = afterCheckpoint;
+ AtomicReference<String> checkpointToReleaseRef = new AtomicReference<>(afterCheckpoint);
boolean updatePostRunStatus = false;
try {
String newThreadName = "async-index-update-" + name;
@@ -582,17 +583,17 @@
threadNameChanged = true;
Thread.currentThread().setName(newThreadName);
updatePostRunStatus = updateIndex(before, beforeCheckpoint, after,
- afterCheckpoint, afterTime, callback);
+ afterCheckpoint, afterTime, callback, checkpointToReleaseRef);
// the update succeeded, i.e. it no longer fails
if (indexStats.didLastIndexingCycleFailed()) {
indexStats.fixed();
}
- // the update succeeded, so we can release the earlier checkpoint
+ // the update succeeded, so we are sure we can release the earlier checkpoint -
// otherwise the new checkpoint associated with the failed update
- // will get released in the finally block
- checkpointToRelease = beforeCheckpoint;
+ // may still get released in the finally block (depending on where the index update failed)
+ checkpointToReleaseRef.set(beforeCheckpoint);
indexStats.setReferenceCheckpoint(afterCheckpoint);
indexStats.setProcessedCheckpoint("");
indexStats.releaseTempCheckpoint(afterCheckpoint);
@@ -607,6 +608,7 @@
}
// null during initial indexing
// and skip release if this cp was used in a split operation
+ String checkpointToRelease = checkpointToReleaseRef.get();
if (checkpointToRelease != null
&& !checkpointToRelease.equals(taskSplitter
.getLastReferencedCp())) {
@@ -758,7 +760,8 @@
protected boolean updateIndex(NodeState before, String beforeCheckpoint,
NodeState after, String afterCheckpoint, String afterTime,
- AsyncUpdateCallback callback) throws CommitFailedException {
+ AsyncUpdateCallback callback,
+ AtomicReference<String> checkpointToReleaseRef) throws CommitFailedException {
Stopwatch watch = Stopwatch.createStarted();
boolean updatePostRunStatus = true;
boolean progressLogged = false;
@@ -827,6 +830,9 @@
}
mergeWithConcurrencyCheck(store, validatorProviders, builder, beforeCheckpoint,
callback.lease, name);
+ // we successfully merged the change that updated the lane to the
+ // afterCheckpoint - so we need to release the beforeCheckpoint now
+ checkpointToReleaseRef.set(beforeCheckpoint);
indexingFailed = false;
if (indexUpdate.isReindexingPerformed()) {
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
index 7f34fb8..1fdcce4 100644
--- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
@@ -41,10 +41,8 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
@@ -52,12 +50,11 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.openmbean.CompositeData;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
@@ -71,6 +68,7 @@
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.plugins.memory.PropertyValues;
import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
import org.apache.jackrabbit.oak.query.index.FilterImpl;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
@@ -84,7 +82,6 @@
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.commit.Validator;
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
-import org.apache.jackrabbit.oak.plugins.memory.PropertyValues;
import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -96,15 +93,18 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
-import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
-import ch.qos.logback.classic.Level;
-
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import ch.qos.logback.classic.Level;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
+
public class AsyncIndexUpdateTest {
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private MetricStatisticsProvider statsProvider =
@@ -140,7 +140,7 @@
* <li>Add some content</li>
* <li>Search & verify</li>
* </ul>
- *
+ *
*/
@Test
public void testAsync() throws Exception {
@@ -177,7 +177,7 @@
* <li>Add some content</li>
* <li>Search & verify</li>
* </ul>
- *
+ *
*/
@Test
public void testAsyncDouble() throws Exception {
@@ -227,7 +227,7 @@
* <li>Add some content</li>
* <li>Search & verify</li>
* </ul>
- *
+ *
*/
@Test
public void testAsyncDoubleSubtree() throws Exception {
@@ -307,6 +307,145 @@
assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
}
+ @Test
+ // this test is very sensitive to timing
+ // - effectively it can only be run in standalone mode
+ @Ignore
+ public void disposeWhileIndexing() throws Exception {
+ final Semaphore merge = new Semaphore(0);
+ final AtomicBoolean isDisposed = new AtomicBoolean(false);
+ final Semaphore disposed = new Semaphore(0);
+ NodeStore store = new MemoryNodeStore() {
+
+ private void checkClosed() {
+ if (isDisposed.get()) {
+ // System.out.println(" Throwing \"This NodeStore is disposed\" " + this);
+ throw new IllegalStateException("This NodeStore is disposed");
+ }
+ }
+
+ @NotNull
+ @Override
+ public String checkpoint(long lifetime, @NotNull Map<String, String> properties) {
+ checkClosed();
+ final String checkpoint = super.checkpoint(lifetime, properties);
+ // System.out.println(" Created checkpoint " + checkpoint);
+ return checkpoint;
+ }
+
+ @Override
+ public synchronized NodeState merge(@NotNull NodeBuilder builder, @NotNull CommitHook commitHook, @NotNull CommitInfo info)
+ throws CommitFailedException {
+ checkClosed();
+ try {
+ return super.merge(builder, commitHook, info);
+ } finally {
+ // System.out.println(" Store after merge: " + this);
+ if (!merge.tryAcquire()) {
+ isDisposed.set(true);
+ disposed.release();
+ // System.out.println(" Now the disposed flag is set (after merge)");
+ }
+ }
+ }
+
+ @Override
+ public synchronized boolean release(String checkpoint) {
+ // currently no checkClosed is done for DocumentNodeStore here,
+ // so in this test variant we're also not doing it here (to reproduce the bug).
+ try {
+ return super.release(checkpoint);
+ } finally {
+ // System.out.println(" Released checkpoint " + checkpoint);
+ }
+ }
+ };
+ merge.release(100);
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), "foo",
+ false, ImmutableSet.of("foo"), null, TYPE,
+ Collections.singletonMap(ASYNC_PROPERTY_NAME, "async"));
+ builder.child("test").setProperty("foo", "a");
+ builder.child("child");
+
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ final AtomicInteger reindexingCount = new AtomicInteger(0);
+ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
+ provider) {
+ @Override
+ protected boolean updateIndex(NodeState before, String beforeCheckpoint, NodeState after, String afterCheckpoint,
+ String afterTime, AsyncUpdateCallback callback, AtomicReference<String> checkpointToRelease) throws CommitFailedException {
+ if (before == MISSING_NODE) {
+ reindexingCount.incrementAndGet();
+ }
+ return super.updateIndex(before, beforeCheckpoint, after, afterCheckpoint, afterTime, callback, checkpointToRelease);
+ }
+ };
+ async.setLeaseTimeOut(250);
+ async.run();
+
+ builder = store.getRoot().builder();
+ builder.child("test").setProperty("foo", "b");
+ builder.child("child").setProperty("prop", "value");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ merge.drainPermits();
+ merge.release(2);
+
+ // the first async.run() was a reindexing, but we're not counting that, so:
+ reindexingCount.set(0);
+
+ // System.out.println("Going for an async index run, reindex count = " + reindexingCount.get());
+
+ // with only 2 permits for the 'merge' semaphore,
+ // ie 2 calls to merge(), the async.run() below
+ // will hit the disposed NodeStore and trigger the bug.
+ // however, it will have released the checkpoint, as that
+ // is unprotected. so a subsequent call to async.run()
+ // will fail
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ async.run();
+ } catch(Throwable th) {
+ // th.printStackTrace();
+ }
+ }
+ });
+ t.start();
+ t.join(5000);
+
+ // System.out.println("Async index run done - restarting NodeStore, reindex count = " + reindexingCount.get());
+
+ // lets resume the NodeStore
+ merge.release(1000);
+ isDisposed.set(false);
+
+ // and rerun the async
+ // System.out.println("and going for another async index run, reindex count = " + reindexingCount.get());
+ async.run();
+ // we did crash before, so the async lease would still be valid, so we expect a failed run:
+ assertTrue(async.isFailing());
+
+ // let's wait until the lease is released - should be 500ms (2x lease timeout of 250ms)
+ // System.out.println("Looping... ");
+ final long timeout = System.currentTimeMillis() + 1000;
+ while(async.isFailing() && System.currentTimeMillis() < timeout) {
+ // let's be a bit CPU friendly..:
+ Thread.sleep(50);
+ // System.out.println(" Rechecking async index..., reindex count = " + reindexingCount.get());
+ async.run();
+ // System.out.println(" Result of async indexing: failing=" + async.isFailing() + " , reindex count = " + reindexingCount.get());
+ }
+ // the test timeout was hit
+ assertTrue(async.isFailing());
+
+ // but now should not have seen a complete reindex
+ assertEquals("reindex happened", 0, reindexingCount.get());
+ }
+
// OAK-1749
@Test
public void branchBaseOnCheckpoint() throws Exception {
diff --git a/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java b/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java
index 8a6bc4d..f9960b7 100644
--- a/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java
+++ b/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java
@@ -30,6 +30,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
@@ -142,12 +143,12 @@
protected boolean updateIndex(NodeState before,
String beforeCheckpoint, NodeState after,
String afterCheckpoint, String afterTime,
- AsyncUpdateCallback callback) throws CommitFailedException {
+ AsyncUpdateCallback callback, AtomicReference<String> checkpointToReleaseRef) throws CommitFailedException {
if (MISSING_NODE == before) {
l.reindexing();
}
return super.updateIndex(before, beforeCheckpoint, after,
- afterCheckpoint, afterTime, callback);
+ afterCheckpoint, afterTime, callback, checkpointToReleaseRef);
}
};
aiu.setCloseTimeOut(1);