Handling checkpoint corruption in case of bookie crash
Descriptions of the changes in this PR:
Allowed fallback to previous checkpoint, assuming that corrupt checkpoint means that checkpointing did not complete and the journal is intact.
### Motivation
Reliability and resilience of teh table service.
### Changes
Allowed fallback to previous checkpoint, assuming that corrupt checkpoint means that checkpointing did not complete and the journal is intact.
Master Issue: #2565
Reviewers: Surinder Singh, Enrico Olivelli <eolivelli@gmail.com>
This closes #2566 from dlg99/master-checkpoint-corruption
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
index 3764847..b26a2f0 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointer.java
@@ -20,6 +20,7 @@
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@@ -123,6 +124,8 @@
latestCheckpointId = checkpointId;
latestCheckpoint = ckpt;
}
+ } catch (FileNotFoundException fnfe) {
+ log.error("Metadata is corrupt for the checkpoint {}. Skipping it.", checkpointId);
}
}
return Pair.of(latestCheckpointId, latestCheckpoint);
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java
index efdc4a4..3500ec5 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksdbCheckpointTask.java
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
@@ -44,6 +45,15 @@
@Slf4j
public class RocksdbCheckpointTask {
+ /**
+ * Error injection support for testing of the checkpoint.
+ * @param <T>
+ */
+ @FunctionalInterface
+ public interface InjectedError<T> {
+ void accept(T t) throws IOException;
+ }
+
private final String dbName;
private final Checkpoint checkpoint;
private final File checkpointDir;
@@ -52,6 +62,9 @@
private final boolean removeLocalCheckpointAfterSuccessfulCheckpoint;
private final boolean removeRemoteCheckpointsAfterSuccessfulCheckpoint;
+ // for testing only
+ private InjectedError<String> injectedError = (String checkpointId) -> {};
+
public RocksdbCheckpointTask(String dbName,
Checkpoint checkpoint,
File checkpointDir,
@@ -67,6 +80,10 @@
this.removeRemoteCheckpointsAfterSuccessfulCheckpoint = removeRemoteCheckpoints;
}
+ public void setInjectedError(InjectedError<String> injectedError) {
+ this.injectedError = injectedError;
+ }
+
public String checkpoint(byte[] txid) throws StateStoreException {
String checkpointId = UUID.randomUUID().toString();
@@ -89,6 +106,8 @@
checkpointStore.createDirectories(sstsPath);
}
+ injectedError.accept(checkpointId);
+
// get the files to copy
List<File> filesToCopy = getFilesToCopy(tempDir);
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
index e493579..40e3517 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/RocksCheckpointerTest.java
@@ -24,11 +24,13 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
@@ -36,6 +38,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
import lombok.Cleanup;
import org.apache.bookkeeper.common.coder.StringUtf8Coder;
import org.apache.bookkeeper.common.kv.KV;
@@ -413,4 +417,74 @@
verifyNumKvs(100);
}
+ /*
+ Bookie can crash or get killed by an operator/automation at any point for any reason.
+ This test covers the situation when this happens mid-checkpoint.
+ */
+ @Test
+ public void testCheckpointRestoreAfterCrash() throws Exception {
+
+ final int numGoodCheckpoints = 3;
+ createMultipleCheckpoints(numGoodCheckpoints, false, false);
+
+ final int numKvs = 100;
+ final String dbName = runtime.getMethodName();
+ final byte[] txid = runtime.getMethodName().getBytes(UTF_8);
+
+ // first prepare rocksdb with 100 kvs;
+ writeNumKvs(numKvs, 100 * numGoodCheckpoints);
+
+ // create a checkpoint with corrupt metadata
+ Checkpoint checkpoint = Checkpoint.create(store.getDb());
+
+ // checkpoint
+ RocksdbCheckpointTask checkpointTask = new RocksdbCheckpointTask(
+ dbName,
+ checkpoint,
+ localCheckpointsDir,
+ checkpointStore,
+ false,
+ false);
+
+ // let's simulate the crash.
+ // crash happens after the createDirectories() succeeded but before
+ // the finalizeCheckpoint() completes.
+ final AtomicReference<String> idRef = new AtomicReference<>();
+ checkpointTask.setInjectedError((id) -> {
+ idRef.set(id);
+ throw new RuntimeException("test");
+ });
+
+ try {
+ checkpointTask.checkpoint(txid);
+ fail("expected RuntimeException");
+ } catch (RuntimeException se) {
+ // noop
+ // in real life case ths is simply crash,
+ // so "finally" at the checkpoint() won't run either
+ }
+
+ // remove local checkpointed dir
+ File checkpointedDir = new File(localCheckpointsDir, idRef.get());
+ MoreFiles.deleteRecursively(
+ Paths.get(checkpointedDir.getAbsolutePath()),
+ RecursiveDeleteOption.ALLOW_INSECURE);
+ assertFalse(checkpointedDir.exists());
+ store.close();
+
+ // restore the checkpoint
+ RocksCheckpointer.restore(dbName, localCheckpointsDir, checkpointStore);
+
+ // al of the following succeeds if the exception from RocksCheckpointer.restore
+ // is ignored
+
+ // make sure all the kvs are readable
+ store = new RocksdbKVStore<>();
+ store.init(spec);
+
+ verifyNumKvs((numGoodCheckpoints + 1) * numKvs);
+ writeNumKvs(numKvs, (numGoodCheckpoints + 1) * numKvs);
+ verifyNumKvs((numGoodCheckpoints + 2) * numKvs);
+ }
+
}