blob: 26cef9a39115090e5cc824ac2c25713fa917162e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint;
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import lombok.Cleanup;
import org.apache.bookkeeper.common.coder.StringUtf8Coder;
import org.apache.bookkeeper.common.kv.KV;
import org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
import org.apache.bookkeeper.statelib.api.kv.KVIterator;
import org.apache.bookkeeper.statelib.impl.kv.RocksdbKVStore;
import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.fs.FSCheckpointManager;
import org.apache.bookkeeper.stream.proto.kv.store.CheckpointMetadata;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.rocksdb.Checkpoint;
/**
* Unit test of {@link RocksCheckpointer}.
*/
public class RocksCheckpointerTest {
@Rule
public final TestName runtime = new TestName();
@Rule
public final TemporaryFolder testDir = new TemporaryFolder();
private File localDir;
private File localCheckpointsDir;
private File remoteDir;
private StateStoreSpec spec;
private RocksdbKVStore<String, String> store;
private CheckpointStore checkpointStore;
@Before
public void setUp() throws Exception {
localDir = testDir.newFolder("local");
localCheckpointsDir = new File(localDir, "checkpoints");
assertTrue(
"Not able to create checkpoints directory",
localCheckpointsDir.mkdir());
remoteDir = testDir.newFolder("remote");
checkpointStore = new FSCheckpointManager(remoteDir);
spec = StateStoreSpec.builder()
.name(runtime.getMethodName())
.keyCoder(StringUtf8Coder.of())
.valCoder(StringUtf8Coder.of())
.localStateStoreDir(localDir)
.stream(runtime.getMethodName())
.build();
store = new RocksdbKVStore<>();
store.init(spec);
}
@After
public void tearDown() throws Exception {
if (null != store) {
store.close();
}
if (null != checkpointStore) {
checkpointStore.close();
}
}
private static String getKey(int i) {
return String.format("key-%06d", i);
}
private static String getValue(int i) {
return String.format("val-%06d", i);
}
private void writeNumKvs(int numKvs, int startIdx) throws Exception {
for (int i = 0; i < numKvs; i++) {
String key = getKey(startIdx + i);
String val = getValue(startIdx + i);
store.put(key, val);
}
store.flush();
}
private void verifyNumKvs(int expectedNumKvs) throws Exception {
try (KVIterator<String, String> iter = store.range(null, null)) {
int idx = 0;
while (iter.hasNext()) {
KV<String, String> kv = iter.next();
assertEquals(getKey(idx), kv.key());
assertEquals(getValue(idx), kv.value());
++idx;
}
assertEquals(expectedNumKvs, idx);
}
}
private void verifyCheckpointMetadata(File checkpointedDir,
CheckpointMetadata metadata) {
String[] files = checkpointedDir.list();
assertNotNull(files);
assertEquals(files.length, metadata.getFilesCount());
Set<String> fileSet = Sets.newHashSet();
for (String file : files) {
fileSet.add(file);
}
for (String file : metadata.getFilesList()) {
assertTrue(fileSet.contains(file));
}
}
private void verifyRemoteFiles(String checkpointId, File checkpointedDir) throws Exception {
File[] files = checkpointedDir.listFiles();
assertNotNull(files);
for (File file : files) {
String remoteFile;
if (RocksUtils.isSstFile(file)) {
remoteFile = RocksUtils.getDestSstPath(store.name(), file.getName());
} else {
remoteFile = RocksUtils.getDestPath(store.name(), checkpointId, file);
}
verifyRemoteFile(remoteFile, file);
}
}
private void verifyRemoteFile(String remoteFile, File localFile) throws Exception {
assertEquals(checkpointStore.getFileLength(remoteFile), localFile.length());
@Cleanup InputStream remoteIs = checkpointStore.openInputStream(remoteFile);
@Cleanup InputStream localIs = new FileInputStream(localFile);
long numBytesToCompare = localFile.length();
while (numBytesToCompare > 0L) {
int numBytesToRead = (int) Math.min(numBytesToCompare, 1024);
byte[] localBytes = new byte[numBytesToRead];
byte[] remoteBytes = new byte[numBytesToRead];
ByteStreams.readFully(localIs, localBytes);
ByteStreams.readFully(remoteIs, remoteBytes);
assertArrayEquals(localBytes, remoteBytes);
numBytesToCompare -= numBytesToRead;
}
}
private List<String> createMultipleCheckpoints(int numCheckpoints,
boolean removeLocalCheckpointAfterCheckpoint,
boolean removeRemoteCheckpointsAfterCheckpoint)
throws Exception {
List<String> checkpointIds = new ArrayList<>(numCheckpoints);
try (RocksCheckpointer checkpointer = new RocksCheckpointer(
store.name(),
localDir,
store.getDb(),
checkpointStore,
removeLocalCheckpointAfterCheckpoint,
removeRemoteCheckpointsAfterCheckpoint)) {
int idx = 0;
for (int i = 0; i < numCheckpoints; i++) {
writeNumKvs(100, idx);
checkpointIds.add(createCheckpoint(checkpointer, i));
idx += 100;
}
}
return checkpointIds;
}
private String createCheckpoint(RocksCheckpointer checkpointer,
int checkpointIdx) throws StateStoreException {
byte[] txid = ("checkpoint-" + checkpointIdx).getBytes(UTF_8);
return checkpointer.checkpointAtTxid(txid);
}
/**
* Basic test.
*
* <p>- checkpoint a local state store to a remote checkpoint store
* - restore checkpoint from the remote checkpoint store
* - verify the restored local state store is correct
*/
@Test
public void testCheckpointRestore() throws Exception {
final int numKvs = 100;
final String dbName = runtime.getMethodName();
final byte[] txid = runtime.getMethodName().getBytes(UTF_8);
// first prepare rocksdb with 100 kvs;
writeNumKvs(numKvs, 0);
Checkpoint checkpoint = Checkpoint.create(store.getDb());
// checkpoint
RocksdbCheckpointTask checkpointTask = new RocksdbCheckpointTask(
dbName,
checkpoint,
localCheckpointsDir,
checkpointStore,
false,
false);
String checkpointId = checkpointTask.checkpoint(txid);
// checkpoint directory exists
File checkpointedDir = new File(localCheckpointsDir, checkpointId);
assertTrue(
"local checkpointed dir " + checkpointedDir + " doesn't exists when `removeLocalCheckpoints` is false",
checkpointedDir.exists());
// remote checkpoint metadata file exists
String checkpointMetadataFile = RocksUtils.getDestCheckpointMetadataPath(store.name(), checkpointId);
assertTrue(checkpointStore.fileExists(checkpointMetadataFile));
int fileLen = (int) checkpointStore.getFileLength(checkpointMetadataFile);
byte[] checkpointMetadataBytes = new byte[fileLen];
@Cleanup InputStream fileIn = checkpointStore.openInputStream(checkpointMetadataFile);
ByteStreams.readFully(fileIn, checkpointMetadataBytes);
// verify the checkpointed metadata exists
CheckpointMetadata metadata = CheckpointMetadata.parseFrom(checkpointMetadataBytes);
assertArrayEquals(txid, metadata.getTxid().toByteArray());
verifyCheckpointMetadata(checkpointedDir, metadata);
verifyRemoteFiles(checkpointId, checkpointedDir);
store.close();
// remove local checkpointed dir
MoreFiles.deleteRecursively(
Paths.get(checkpointedDir.getAbsolutePath()),
RecursiveDeleteOption.ALLOW_INSECURE);
assertFalse(checkpointedDir.exists());
// restore the checkpoint
RocksdbRestoreTask restoreTask = new RocksdbRestoreTask(
dbName,
localCheckpointsDir,
checkpointStore);
restoreTask.restore(checkpointId, metadata);
assertTrue(checkpointedDir.exists());
// verify the content
verifyCheckpointMetadata(checkpointedDir, metadata);
verifyRemoteFiles(checkpointId, checkpointedDir);
// make sure all the kvs are readable
store = new RocksdbKVStore<>();
store.init(spec);
verifyNumKvs(numKvs);
}
@Test
public void testRestoreCleanupCheckpoints() throws Exception {
// create 3 checkpoints and leave them locally
List<String> checkpointIds = createMultipleCheckpoints(3, false, false);
store.close();
List<String> remoteCheckpoints = checkpointStore.listFiles(RocksUtils.getDestCheckpointsPath(store.name()));
assertEquals(checkpointIds.size(), remoteCheckpoints.size());
for (String checkpoint : checkpointIds) {
assertTrue(remoteCheckpoints.contains(checkpoint));
assertTrue(new File(localCheckpointsDir, checkpoint).exists());
}
// restore from checkpoints
CheckpointMetadata metadata = RocksCheckpointer.restore(
store.name(),
localDir,
checkpointStore);
assertNotNull(metadata);
assertArrayEquals("checkpoint-2".getBytes(UTF_8), metadata.getTxid().toByteArray());
for (int i = 0; i < 3; i++) {
String checkpoint = checkpointIds.get(i);
if (i == 2) {
assertTrue(new File(localCheckpointsDir, checkpoint).exists());
} else {
assertFalse(new File(localCheckpointsDir, checkpoint).exists());
}
assertTrue(
checkpointStore.fileExists(RocksUtils.getDestCheckpointPath(store.name(), checkpoint)));
}
// restore from the latest checkpoint
store = new RocksdbKVStore<>();
store.init(spec);
verifyNumKvs(300);
}
@Test
public void testRestoreCheckpointMissingLocally() throws Exception {
// create 3 checkpoints and leave them locally
List<String> checkpointIds = createMultipleCheckpoints(3, false, false);
store.close();
List<String> remoteCheckpoints = checkpointStore.listFiles(RocksUtils.getDestCheckpointsPath(store.name()));
assertEquals(checkpointIds.size(), remoteCheckpoints.size());
for (String checkpoint : checkpointIds) {
assertTrue(remoteCheckpoints.contains(checkpoint));
assertTrue(new File(localCheckpointsDir, checkpoint).exists());
}
// remove a local checkpoint directory
MoreFiles.deleteRecursively(
Paths.get(localCheckpointsDir.getAbsolutePath(), checkpointIds.get(2)),
RecursiveDeleteOption.ALLOW_INSECURE);
// restore from checkpoints
CheckpointMetadata metadata = RocksCheckpointer.restore(
store.name(),
localDir,
checkpointStore);
assertNotNull(metadata);
assertArrayEquals("checkpoint-2".getBytes(UTF_8), metadata.getTxid().toByteArray());
for (int i = 0; i < 3; i++) {
String checkpoint = checkpointIds.get(i);
if (i == 2) {
assertTrue(new File(localCheckpointsDir, checkpoint).exists());
} else {
assertFalse(new File(localCheckpointsDir, checkpoint).exists());
}
assertTrue(
checkpointStore.fileExists(RocksUtils.getDestCheckpointPath(store.name(), checkpoint)));
}
// restore from the latest checkpoint
store = new RocksdbKVStore<>();
store.init(spec);
verifyNumKvs(300);
}
@Test
public void testRestoreLocalCheckpointCorrupted() throws Exception {
// create 1 checkpoints and leave them locally
List<String> checkpointIds = createMultipleCheckpoints(1, false, false);
store.close();
List<String> remoteCheckpoints = checkpointStore.listFiles(RocksUtils.getDestCheckpointsPath(store.name()));
assertEquals(checkpointIds.size(), remoteCheckpoints.size());
for (String checkpoint : checkpointIds) {
assertTrue(remoteCheckpoints.contains(checkpoint));
assertTrue(new File(localCheckpointsDir, checkpoint).exists());
}
// remove a local checkpoint directory
File[] files = new File(localCheckpointsDir, checkpointIds.get(0)).listFiles();
for (int i = 0; i < files.length / 2; i++) {
assertTrue(files[i].delete());
}
// restore from checkpoints
CheckpointMetadata metadata = RocksCheckpointer.restore(
store.name(),
localDir,
checkpointStore);
assertNotNull(metadata);
assertArrayEquals("checkpoint-0".getBytes(UTF_8), metadata.getTxid().toByteArray());
String checkpoint = checkpointIds.get(0);
assertTrue(new File(localCheckpointsDir, checkpoint).exists());
assertTrue(
checkpointStore.fileExists(RocksUtils.getDestCheckpointPath(store.name(), checkpoint)));
// restore from the latest checkpoint
store = new RocksdbKVStore<>();
store.init(spec);
verifyNumKvs(100);
}
}