Handling checkpoint corruption in case of bookie crash

Descriptions of the changes in this PR:

Allowed fallback to previous checkpoint, assuming that corrupt checkpoint means that checkpointing did not complete and the journal is intact.

### Motivation

Reliability and resilience of teh table service.

### Changes

Allowed fallback to previous checkpoint, assuming that corrupt checkpoint means that checkpointing did not complete and the journal is intact.

Master Issue: #2565



Reviewers: Surinder Singh, Enrico Olivelli <eolivelli@gmail.com>

This closes #2566 from dlg99/master-checkpoint-corruption
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
index 3764847..b26a2f0 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
@@ -20,6 +20,7 @@
 import com.google.common.io.MoreFiles;
 import com.google.common.io.RecursiveDeleteOption;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -123,6 +124,8 @@
                     latestCheckpointId = checkpointId;
                     latestCheckpoint = ckpt;
                 }
+            } catch (FileNotFoundException fnfe) {
+                log.error("Metadata is corrupt for the checkpoint {}. Skipping it.", checkpointId);
             }
         }
         return Pair.of(latestCheckpointId, latestCheckpoint);
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java
index efdc4a4..3500ec5 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java
@@ -30,6 +30,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
 import org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
@@ -44,6 +45,15 @@
 @Slf4j
 public class RocksdbCheckpointTask {
 
+    /**
+     * Error injection support for testing of the checkpoint.
+     * @param <T>
+     */
+    @FunctionalInterface
+    public interface InjectedError<T> {
+        void accept(T t) throws IOException;
+    }
+
     private final String dbName;
     private final Checkpoint checkpoint;
     private final File checkpointDir;
@@ -52,6 +62,9 @@
     private final boolean removeLocalCheckpointAfterSuccessfulCheckpoint;
     private final boolean removeRemoteCheckpointsAfterSuccessfulCheckpoint;
 
+    // for testing only
+    private InjectedError<String> injectedError = (String checkpointId) -> {};
+
     public RocksdbCheckpointTask(String dbName,
                                  Checkpoint checkpoint,
                                  File checkpointDir,
@@ -67,6 +80,10 @@
         this.removeRemoteCheckpointsAfterSuccessfulCheckpoint = removeRemoteCheckpoints;
     }
 
+    public void setInjectedError(InjectedError<String> injectedError) {
+        this.injectedError = injectedError;
+    }
+
     public String checkpoint(byte[] txid) throws StateStoreException {
         String checkpointId = UUID.randomUUID().toString();
 
@@ -89,6 +106,8 @@
                 checkpointStore.createDirectories(sstsPath);
             }
 
+            injectedError.accept(checkpointId);
+
             // get the files to copy
             List<File> filesToCopy = getFilesToCopy(tempDir);
 
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
index e493579..40e3517 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
@@ -24,11 +24,13 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.MoreFiles;
 import com.google.common.io.RecursiveDeleteOption;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
@@ -36,6 +38,8 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
 import lombok.Cleanup;
 import org.apache.bookkeeper.common.coder.StringUtf8Coder;
 import org.apache.bookkeeper.common.kv.KV;
@@ -413,4 +417,74 @@
         verifyNumKvs(100);
     }
 
+    /*
+    Bookie can crash or get killed by an operator/automation at any point for any reason.
+    This test covers the situation when this happens mid-checkpoint.
+     */
+    @Test
+    public void testCheckpointRestoreAfterCrash() throws Exception {
+
+        final int numGoodCheckpoints = 3;
+        createMultipleCheckpoints(numGoodCheckpoints, false, false);
+
+        final int numKvs = 100;
+        final String dbName = runtime.getMethodName();
+        final byte[] txid = runtime.getMethodName().getBytes(UTF_8);
+
+        // first prepare rocksdb with 100 kvs;
+        writeNumKvs(numKvs, 100 * numGoodCheckpoints);
+
+        // create a checkpoint with corrupt metadata
+        Checkpoint checkpoint = Checkpoint.create(store.getDb());
+
+        // checkpoint
+        RocksdbCheckpointTask checkpointTask = new RocksdbCheckpointTask(
+                dbName,
+                checkpoint,
+                localCheckpointsDir,
+                checkpointStore,
+                false,
+                false);
+
+        // let's simulate the crash.
+        // crash happens after the createDirectories() succeeded but before
+        // the finalizeCheckpoint() completes.
+        final AtomicReference<String> idRef = new AtomicReference<>();
+        checkpointTask.setInjectedError((id) -> {
+            idRef.set(id);
+            throw new RuntimeException("test");
+        });
+
+        try {
+            checkpointTask.checkpoint(txid);
+            fail("expected RuntimeException");
+        } catch (RuntimeException se) {
+            // noop
+            // in real life case ths is simply crash,
+            // so "finally" at the checkpoint() won't run either
+        }
+
+        // remove local checkpointed dir
+        File checkpointedDir = new File(localCheckpointsDir, idRef.get());
+        MoreFiles.deleteRecursively(
+                Paths.get(checkpointedDir.getAbsolutePath()),
+                RecursiveDeleteOption.ALLOW_INSECURE);
+        assertFalse(checkpointedDir.exists());
+        store.close();
+
+        // restore the checkpoint
+        RocksCheckpointer.restore(dbName, localCheckpointsDir, checkpointStore);
+
+        // al of the following succeeds if the exception from RocksCheckpointer.restore
+        // is ignored
+
+        // make sure all the kvs are readable
+        store = new RocksdbKVStore<>();
+        store.init(spec);
+
+        verifyNumKvs((numGoodCheckpoints + 1) * numKvs);
+        writeNumKvs(numKvs, (numGoodCheckpoints + 1) * numKvs);
+        verifyNumKvs((numGoodCheckpoints + 2) * numKvs);
+    }
+
 }