OAK-9350 Index update: release the correct checkpoint

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1886459 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
index 66ce53c..75dbb5f 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
@@ -38,6 +38,7 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
@@ -574,7 +575,7 @@
             return;
         }
 
-        String checkpointToRelease = afterCheckpoint;
+        AtomicReference<String> checkpointToReleaseRef = new AtomicReference<>(afterCheckpoint);
         boolean updatePostRunStatus = false;
         try {
             String newThreadName = "async-index-update-" + name;
@@ -582,17 +583,17 @@
             threadNameChanged = true;
             Thread.currentThread().setName(newThreadName);
             updatePostRunStatus = updateIndex(before, beforeCheckpoint, after,
-                    afterCheckpoint, afterTime, callback);
+                    afterCheckpoint, afterTime, callback, checkpointToReleaseRef);
 
             // the update succeeded, i.e. it no longer fails
             if (indexStats.didLastIndexingCycleFailed()) {
                 indexStats.fixed();
             }
 
-            // the update succeeded, so we can release the earlier checkpoint
+            // the update succeeded, so we are sure we can release the earlier checkpoint -
             // otherwise the new checkpoint associated with the failed update
-            // will get released in the finally block
-            checkpointToRelease = beforeCheckpoint;
+            // may still get released in the finally block (depending on where the index update failed)
+            checkpointToReleaseRef.set(beforeCheckpoint);
             indexStats.setReferenceCheckpoint(afterCheckpoint);
             indexStats.setProcessedCheckpoint("");
             indexStats.releaseTempCheckpoint(afterCheckpoint);
@@ -607,6 +608,7 @@
             }
             // null during initial indexing
             // and skip release if this cp was used in a split operation
+            String checkpointToRelease = checkpointToReleaseRef.get();
             if (checkpointToRelease != null
                     && !checkpointToRelease.equals(taskSplitter
                     .getLastReferencedCp())) {
@@ -758,7 +760,8 @@
 
     protected boolean updateIndex(NodeState before, String beforeCheckpoint,
                                   NodeState after, String afterCheckpoint, String afterTime,
-                                  AsyncUpdateCallback callback) throws CommitFailedException {
+                                  AsyncUpdateCallback callback,
+                                  AtomicReference<String> checkpointToReleaseRef) throws CommitFailedException {
         Stopwatch watch = Stopwatch.createStarted();
         boolean updatePostRunStatus = true;
         boolean progressLogged = false;
@@ -827,6 +830,9 @@
             }
             mergeWithConcurrencyCheck(store, validatorProviders, builder, beforeCheckpoint,
                     callback.lease, name);
+            // we successfully merged the change that updated the lane to the
+            // afterCheckpoint - so we need to release the beforeCheckpoint now
+            checkpointToReleaseRef.set(beforeCheckpoint);
             indexingFailed = false;
 
             if (indexUpdate.isReindexingPerformed()) {
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
index 7f34fb8..1fdcce4 100644
--- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
@@ -41,10 +41,8 @@
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -52,12 +50,11 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.openmbean.CompositeData;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
@@ -71,6 +68,7 @@
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.plugins.memory.PropertyValues;
 import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
 import org.apache.jackrabbit.oak.query.index.FilterImpl;
 import org.apache.jackrabbit.oak.spi.commit.CommitContext;
@@ -84,7 +82,6 @@
 import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.commit.Validator;
 import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
-import org.apache.jackrabbit.oak.plugins.memory.PropertyValues;
 import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -96,15 +93,18 @@
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.junit.After;
-import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
-import ch.qos.logback.classic.Level;
-
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import ch.qos.logback.classic.Level;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
+
 public class AsyncIndexUpdateTest {
     private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
     private MetricStatisticsProvider statsProvider =
@@ -140,7 +140,7 @@
      * <li>Add some content</li>
      * <li>Search & verify</li>
      * </ul>
-     * 
+     *
      */
     @Test
     public void testAsync() throws Exception {
@@ -177,7 +177,7 @@
      * <li>Add some content</li>
      * <li>Search & verify</li>
      * </ul>
-     * 
+     *
      */
     @Test
     public void testAsyncDouble() throws Exception {
@@ -227,7 +227,7 @@
      * <li>Add some content</li>
      * <li>Search & verify</li>
      * </ul>
-     * 
+     *
      */
     @Test
     public void testAsyncDoubleSubtree() throws Exception {
@@ -307,6 +307,145 @@
         assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
     }
 
+    @Test
+    // this test is very sensitive to timing
+    // - effectively it can only be run in standalone mode
+    @Ignore
+    public void disposeWhileIndexing() throws Exception {
+        final Semaphore merge = new Semaphore(0);
+        final AtomicBoolean isDisposed = new AtomicBoolean(false);
+        final Semaphore disposed = new Semaphore(0);
+        NodeStore store = new MemoryNodeStore() {
+
+            private void checkClosed() {
+                if (isDisposed.get()) {
+                    // System.out.println("  Throwing \"This NodeStore is disposed\" " + this);
+                    throw new IllegalStateException("This NodeStore is disposed");
+                }
+            }
+
+            @NotNull
+            @Override
+            public String checkpoint(long lifetime, @NotNull Map<String, String> properties) {
+                checkClosed();
+                final String checkpoint = super.checkpoint(lifetime, properties);
+                // System.out.println("  Created checkpoint " + checkpoint);
+                return checkpoint;
+            }
+
+            @Override
+            public synchronized NodeState merge(@NotNull NodeBuilder builder, @NotNull CommitHook commitHook, @NotNull CommitInfo info)
+                    throws CommitFailedException {
+                checkClosed();
+                try {
+                    return super.merge(builder, commitHook, info);
+                } finally {
+                    // System.out.println("  Store after merge: " + this);
+                    if (!merge.tryAcquire()) {
+                        isDisposed.set(true);
+                        disposed.release();
+                        // System.out.println("  Now the disposed flag is set (after merge)");
+                    }
+                }
+            }
+
+            @Override
+            public synchronized boolean release(String checkpoint) {
+                // currently no checkClosed is done for DocumentNodeStore here,
+                // so in this test variant we're also not doing it here (to reproduce the bug).
+                try {
+                    return super.release(checkpoint);
+                } finally {
+                    // System.out.println("  Released checkpoint " + checkpoint);
+                }
+            }
+        };
+        merge.release(100);
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), "foo",
+                false, ImmutableSet.of("foo"), null, TYPE,
+                Collections.singletonMap(ASYNC_PROPERTY_NAME, "async"));
+        builder.child("test").setProperty("foo", "a");
+        builder.child("child");
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        final AtomicInteger reindexingCount = new AtomicInteger(0);
+        final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
+                provider) {
+            @Override
+            protected boolean updateIndex(NodeState before, String beforeCheckpoint, NodeState after, String afterCheckpoint,
+                    String afterTime, AsyncUpdateCallback callback, AtomicReference<String> checkpointToRelease) throws CommitFailedException {
+                if (before == MISSING_NODE) {
+                    reindexingCount.incrementAndGet();
+                }
+                return super.updateIndex(before, beforeCheckpoint, after, afterCheckpoint, afterTime, callback, checkpointToRelease);
+            }
+        };
+        async.setLeaseTimeOut(250);
+        async.run();
+
+        builder = store.getRoot().builder();
+        builder.child("test").setProperty("foo", "b");
+        builder.child("child").setProperty("prop", "value");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        merge.drainPermits();
+        merge.release(2);
+
+        // the first async.run() was a reindexing, but we're not counting that, so:
+        reindexingCount.set(0);
+
+        // System.out.println("Going for an async index run, reindex count = " + reindexingCount.get());
+
+        // with only 2 permits for the 'merge' semaphore,
+        // ie 2 calls to merge(), the async.run() below
+        // will hit the disposed NodeStore and trigger the bug.
+        // however, it will have released the checkpoint, as that
+        // is unprotected. so a subsequent call to async.run()
+        // will fail
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    async.run();
+                } catch(Throwable th) {
+                    // th.printStackTrace();
+                }
+            }
+        });
+        t.start();
+        t.join(5000);
+
+        // System.out.println("Async index run done - restarting NodeStore, reindex count = " + reindexingCount.get());
+
+        // lets resume the NodeStore
+        merge.release(1000);
+        isDisposed.set(false);
+
+        // and rerun the async
+        // System.out.println("and going for another async index run, reindex count = " + reindexingCount.get());
+        async.run();
+        // we did crash before, so the async lease would still be valid, so we expect a failed run:
+        assertTrue(async.isFailing());
+
+        // let's wait until the lease is released - should be 500ms (2x lease timeout of 250ms)
+        // System.out.println("Looping... ");
+        final long timeout = System.currentTimeMillis() + 1000;
+        while(async.isFailing() && System.currentTimeMillis() < timeout) {
+            // let's be a bit CPU friendly..:
+            Thread.sleep(50);
+            // System.out.println("  Rechecking async index..., reindex count = " + reindexingCount.get());
+            async.run();
+            // System.out.println("  Result of async indexing: failing=" + async.isFailing() + " , reindex count = " + reindexingCount.get());
+        }
+        // the test timeout was hit
+        assertTrue(async.isFailing());
+
+        // but now should not have seen a complete reindex
+        assertEquals("reindex happened", 0, reindexingCount.get());
+    }
+
     // OAK-1749
     @Test
     public void branchBaseOnCheckpoint() throws Exception {
diff --git a/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java b/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java
index 8a6bc4d..f9960b7 100644
--- a/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java
+++ b/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java
@@ -30,6 +30,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
@@ -142,12 +143,12 @@
             protected boolean updateIndex(NodeState before,
                     String beforeCheckpoint, NodeState after,
                     String afterCheckpoint, String afterTime,
-                    AsyncUpdateCallback callback) throws CommitFailedException {
+                    AsyncUpdateCallback callback, AtomicReference<String> checkpointToReleaseRef) throws CommitFailedException {
                 if (MISSING_NODE == before) {
                     l.reindexing();
                 }
                 return super.updateIndex(before, beforeCheckpoint, after,
-                        afterCheckpoint, afterTime, callback);
+                        afterCheckpoint, afterTime, callback, checkpointToReleaseRef);
             }
         };
         aiu.setCloseTimeOut(1);