blob: 56aaae539ea15149c761f8121bea31dbccae9b96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.statelib.impl.kv;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants.BLOCK_CACHE_SIZE;
import static org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants.BLOCK_SIZE;
import static org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants.DEFAULT_CHECKSUM_TYPE;
import static org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants.DEFAULT_COMPACTION_STYLE;
import static org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants.DEFAULT_COMPRESSION_TYPE;
import static org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants.DEFAULT_LOG_LEVEL;
import static org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants.DEFAULT_PARALLELISM;
import static org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants.MAX_WRITE_BUFFERS;
import static org.apache.bookkeeper.statelib.impl.rocksdb.RocksConstants.WRITE_BUFFER_SIZE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import com.google.common.primitives.SignedBytes;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.coder.Coder;
import org.apache.bookkeeper.common.kv.KV;
import org.apache.bookkeeper.common.kv.KVImpl;
import org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.bookkeeper.statelib.api.exceptions.InvalidStateStoreException;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreException;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
import org.apache.bookkeeper.statelib.api.kv.KVIterator;
import org.apache.bookkeeper.statelib.api.kv.KVMulti;
import org.apache.bookkeeper.statelib.api.kv.KVStore;
import org.apache.bookkeeper.statelib.impl.Bytes;
import org.apache.bookkeeper.statelib.impl.rocksdb.RocksUtils;
import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.CheckpointInfo;
import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.RocksCheckpointer;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
/**
* A key/value store implemented using {@link http://rocksdb.org/}.
*
* @param <K> key type
* @param <V> value type
*/
@Slf4j
public class RocksdbKVStore<K, V> implements KVStore<K, V> {
private static final byte[] METADATA_CF = ".meta".getBytes(UTF_8);
private static final byte[] DATA_CF = "default".getBytes(UTF_8);
private static final byte[] LAST_REVISION = ".lrev".getBytes(UTF_8);
private static final AtomicLongFieldUpdater<RocksdbKVStore> lastRevisionUpdater =
AtomicLongFieldUpdater.newUpdater(RocksdbKVStore.class, "lastRevision");
// parameters for the store
protected String name;
protected Coder<K> keyCoder;
protected Coder<V> valCoder;
// rocksdb state
protected File dbDir;
protected RocksDB db;
protected ColumnFamilyHandle metaCfHandle;
protected ColumnFamilyHandle dataCfHandle;
// iterators
protected final Set<KVIterator<K, V>> kvIters;
// options used by rocksdb
protected DBOptions dbOpts;
protected ColumnFamilyOptions cfOpts;
protected WriteOptions writeOpts;
protected FlushOptions flushOpts;
// states of the store
protected volatile boolean isInitialized = false;
protected volatile boolean closed = false;
protected volatile long lastRevision = -1L;
private final byte[] lastRevisionBytes = new byte[Long.BYTES];
// checkpointer store
private CheckpointStore checkpointStore;
private ScheduledExecutorService checkpointScheduler;
// rocksdb checkpointer
private RocksCheckpointer checkpointer;
static {
RocksDB.loadLibrary();
}
private boolean cleanupLocalStoreDirEnable;
public RocksdbKVStore() {
// initialize the iterators set
this.kvIters = Collections.synchronizedSet(Sets.newHashSet());
}
protected void checkStoreOpen() {
if (closed) {
throw new InvalidStateStoreException("State store " + name + " is already closed");
}
if (!isInitialized) {
throw new InvalidStateStoreException("State Store " + name + " is not initialized yet");
}
}
@VisibleForTesting
public synchronized RocksDB getDb() {
return db;
}
@Override
public synchronized String name() {
return this.name;
}
private void loadRocksdbFromCheckpointStore(StateStoreSpec spec) {
checkNotNull(spec.getCheckpointIOScheduler(),
"checkpoint io scheduler is not configured");
checkNotNull(spec.getCheckpointDuration(),
"checkpoint duration is not configured");
String dbName = spec.getName();
File localStorePath = spec.getLocalStateStoreDir();
List<CheckpointInfo> checkpoints = RocksCheckpointer.getCheckpoints(dbName, spec.getCheckpointStore());
for (CheckpointInfo cpi : checkpoints) {
try {
cpi.restore(dbName, localStorePath, spec.getCheckpointStore());
openRocksdb(spec);
checkpoints.stream()
.filter(cp -> cp != cpi) // ignore the current restored checkpoint
.forEach(cp -> cp.remove(localStorePath)); // delete everything else
break;
} catch (StateStoreException e) {
// Got an exception. Log and try the next checkpoint
log.error("Failed to restore checkpoint: {}", cpi, e);
}
}
}
@Override
public synchronized void checkpoint() {
log.info("Checkpoint local state store {} at revision {}", name, getLastRevision());
byte[] checkpointAtRevisionBytes = new byte[Long.BYTES];
System.arraycopy(lastRevisionBytes, 0, checkpointAtRevisionBytes, 0, checkpointAtRevisionBytes.length);
checkpointScheduler.submit(() -> {
try {
// TODO: move create checkpoint to the checkpoint method
checkpointer.checkpointAtTxid(checkpointAtRevisionBytes);
} catch (StateStoreException e) {
log.error("Failed to checkpoint state store {} at revision {}",
name, Bytes.toLong(checkpointAtRevisionBytes, 0), e);
}
});
}
private void readLastRevision() throws StateStoreException {
byte[] revisionBytes;
try {
revisionBytes = db.get(metaCfHandle, LAST_REVISION);
} catch (RocksDBException e) {
throw new StateStoreException("Failed to read last revision from state store " + name(), e);
}
if (null == revisionBytes) {
return;
}
long revision = Bytes.toLong(revisionBytes, 0);
lastRevisionUpdater.set(this, revision);
}
@Override
public long getLastRevision() {
return lastRevisionUpdater.get(this);
}
private void setLastRevision(long lastRevision) {
lastRevisionUpdater.set(this, lastRevision);
Bytes.toBytes(lastRevision, lastRevisionBytes, 0);
}
private void updateLastRevision(long revision) {
if (revision >= 0) { // k/v comes from log stream
if (getLastRevision() >= revision) { // these k/v pairs are duplicates
return;
}
// update revision
setLastRevision(revision);
}
}
protected void updateLastRevision(WriteBatch batch, long revision) {
if (revision >= 0) { // k/v comes from log stream
if (getLastRevision() >= revision) { // these k/v pairs are duplicates
return;
}
try {
// update revision
setLastRevision(revision);
batch.put(metaCfHandle, LAST_REVISION, lastRevisionBytes);
} catch (RocksDBException e) {
throw new StateStoreRuntimeException(
"Error while updating last revision " + revision + " from store " + name, e);
}
}
}
@Override
@SuppressWarnings("unchecked")
public synchronized void init(StateStoreSpec spec) throws StateStoreException {
checkNotNull(spec.getLocalStateStoreDir(),
"local state store directory is not configured");
this.name = spec.getName();
this.cleanupLocalStoreDirEnable = spec.isLocalStorageCleanupEnable();
// initialize the coders
this.keyCoder = (Coder<K>) spec.getKeyCoder();
this.valCoder = (Coder<V>) spec.getValCoder();
cleanupLocalStoreDir(spec.getLocalStateStoreDir());
checkpointStore = spec.getCheckpointStore();
if (null != checkpointStore) {
// load checkpoint from checkpoint store
loadRocksdbFromCheckpointStore(spec);
} else {
// open the rocksdb
openRocksdb(spec);
}
// once the rocksdb is opened, read the last revision
readLastRevision();
if (null != checkpointStore) {
checkpointer = new RocksCheckpointer(
name(),
dbDir,
db,
checkpointStore,
true,
true,
spec.isCheckpointChecksumEnable(),
spec.isCheckpointChecksumCompatible());
checkpointScheduler = spec.getCheckpointIOScheduler();
}
this.isInitialized = true;
}
protected void openRocksdb(StateStoreSpec spec) throws StateStoreException {
// initialize the db options
final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
final Cache cache = new LRUCache(BLOCK_CACHE_SIZE);
tableConfig.setBlockCache(cache);
tableConfig.setBlockSize(BLOCK_SIZE);
tableConfig.setChecksumType(DEFAULT_CHECKSUM_TYPE);
dbOpts = new DBOptions();
dbOpts.setCreateIfMissing(true);
dbOpts.setErrorIfExists(false);
dbOpts.setInfoLogLevel(DEFAULT_LOG_LEVEL);
dbOpts.setIncreaseParallelism(DEFAULT_PARALLELISM);
dbOpts.setCreateMissingColumnFamilies(true);
cfOpts = new ColumnFamilyOptions();
cfOpts.setTableFormatConfig(tableConfig);
cfOpts.setWriteBufferSize(WRITE_BUFFER_SIZE);
cfOpts.setCompressionType(DEFAULT_COMPRESSION_TYPE);
cfOpts.setCompactionStyle(DEFAULT_COMPACTION_STYLE);
cfOpts.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
// initialize the write options
writeOpts = new WriteOptions();
writeOpts.setDisableWAL(true); // disable wal, since the source of truth will be on distributedlog
// initialize the flush options
flushOpts = new FlushOptions();
flushOpts.setWaitForFlush(true);
// open the rocksdb
this.dbDir = spec.getLocalStateStoreDir();
Pair<RocksDB, List<ColumnFamilyHandle>> dbPair = openLocalDB(dbDir, dbOpts, cfOpts);
this.db = dbPair.getLeft();
this.metaCfHandle = dbPair.getRight().get(0);
this.dataCfHandle = dbPair.getRight().get(1);
}
protected Pair<RocksDB, List<ColumnFamilyHandle>> openLocalDB(File dir,
DBOptions options,
ColumnFamilyOptions cfOpts)
throws StateStoreException {
return openRocksdb(dir, options, cfOpts);
}
protected static Pair<RocksDB, List<ColumnFamilyHandle>> openRocksdb(
File dir, DBOptions options, ColumnFamilyOptions cfOpts)
throws StateStoreException {
// make sure the db directory's parent dir is created
ColumnFamilyDescriptor metaDesc = new ColumnFamilyDescriptor(METADATA_CF, cfOpts);
ColumnFamilyDescriptor dataDesc = new ColumnFamilyDescriptor(DATA_CF, cfOpts);
try {
Files.createDirectories(dir.toPath());
File dbDir = new File(dir, "current");
if (!dbDir.exists()) {
// empty state
String uuid = UUID.randomUUID().toString();
Path checkpointPath = Paths.get(dir.getAbsolutePath(), "checkpoints", uuid);
Files.createDirectories(checkpointPath);
Files.createSymbolicLink(
Paths.get(dbDir.getAbsolutePath()),
checkpointPath);
}
List<ColumnFamilyHandle> cfHandles = Lists.newArrayListWithExpectedSize(2);
RocksDB db = RocksDB.open(
options,
dbDir.getAbsolutePath(),
Lists.newArrayList(metaDesc, dataDesc),
cfHandles);
return Pair.of(db, cfHandles);
} catch (IOException ioe) {
log.error("Failed to create parent directory {} for opening rocksdb", dir.getParentFile().toPath(), ioe);
throw new StateStoreException(ioe);
} catch (RocksDBException dbe) {
log.error("Failed to open rocksdb at dir {}", dir.getAbsolutePath(), dbe);
throw new StateStoreException(dbe);
}
}
@Override
public synchronized void flush() throws StateStoreException {
if (null == db) {
return;
}
try {
db.flush(flushOpts);
} catch (RocksDBException e) {
throw new StateStoreException("Exception on flushing rocksdb from store " + name, e);
}
}
@Override
public synchronized void close() {
if (closed) {
return;
}
closed = true;
if (null != checkpointer) {
checkpointer.close();
}
// close iterators
closeIters();
// close db
closeLocalDB();
// release options
RocksUtils.close(dbOpts);
RocksUtils.close(writeOpts);
RocksUtils.close(flushOpts);
RocksUtils.close(cfOpts);
cleanupLocalStoreDir(dbDir);
}
private void cleanupLocalStoreDir(File dbDir) {
if (cleanupLocalStoreDirEnable) {
if (dbDir.exists()) {
try {
MoreFiles.deleteRecursively(dbDir.toPath(), RecursiveDeleteOption.ALLOW_INSECURE);
} catch (IOException e) {
log.error("Failed to cleanup localStoreDir", e);
}
}
}
}
protected void closeLocalDB() {
try {
flush();
} catch (StateStoreException e) {
// flush() already logs this exception.
}
RocksUtils.close(metaCfHandle);
RocksUtils.close(dataCfHandle);
RocksUtils.close(db);
}
private void closeIters() {
Set<KVIterator> iterators;
synchronized (kvIters) {
iterators = Sets.newHashSet(kvIters);
}
iterators.forEach(KVIterator::close);
}
@Override
public synchronized V get(K key) {
checkNotNull(key, "key cannot be null");
checkStoreOpen();
byte[] keyBytes = keyCoder.encode(key);
return getRaw(key, keyBytes);
}
private V getRaw(K key, byte[] keyBytes) {
byte[] valBytes = getRawBytes(key, keyBytes);
if (null == valBytes) {
return null;
}
return valCoder.decode(valBytes);
}
protected byte[] getRawBytes(K key, byte[] keyBytes) {
try {
return this.db.get(dataCfHandle, keyBytes);
} catch (RocksDBException e) {
throw new StateStoreRuntimeException("Error while getting value for key " + key + " from store " + name, e);
}
}
@Override
public synchronized KVIterator<K, V> range(K from, K to) {
checkStoreOpen();
RocksIterator rocksIter = db.newIterator(dataCfHandle);
if (null == from) {
rocksIter.seekToFirst();
} else {
byte[] fromBytes = keyCoder.encode(from);
rocksIter.seek(fromBytes);
}
KVIterator<K, V> kvIter;
if (null == to) {
kvIter = new RocksdbKVIterator(name, rocksIter, keyCoder, valCoder);
} else {
kvIter = new RocksdbRangeIterator(name, rocksIter, keyCoder, valCoder, to);
}
kvIters.add(kvIter);
return kvIter;
}
@Override
public synchronized void put(K key, V value) {
put(key, value, -1);
}
synchronized void put(K key, V value, long revision) {
checkNotNull(key, "key cannot be null");
checkStoreOpen();
updateLastRevision(revision);
byte[] keyBytes = keyCoder.encode(key);
putRaw(key, keyBytes, value, revision);
}
private void putRaw(K key, byte[] keyBytes, V value, long revision) {
try {
WriteBatch batch = new WriteBatch();
if (revision > 0) {
// last revision has been set to revision bytes
batch.put(metaCfHandle, LAST_REVISION, lastRevisionBytes);
}
if (null == value) {
// delete a key if value is null
batch.delete(dataCfHandle, keyBytes);
} else {
byte[] valBytes = valCoder.encode(value);
batch.put(dataCfHandle, keyBytes, valBytes);
}
db.write(writeOpts, batch);
} catch (RocksDBException e) {
throw new StateStoreRuntimeException("Error while updating key " + key
+ " to value " + value + " from store " + name, e);
}
}
@Override
public V putIfAbsent(K key, V value) {
return putIfAbsent(key, value, -1L);
}
synchronized V putIfAbsent(K key, V value, long revision) {
checkNotNull(key, "key cannot be null");
checkStoreOpen();
updateLastRevision(revision);
byte[] keyBytes = keyCoder.encode(key);
V oldVal = getRaw(key, keyBytes);
if (null != oldVal) {
return oldVal;
}
if (value == null) {
return null;
}
putRaw(key, keyBytes, value, revision);
return null;
}
@Override
public synchronized KVMulti<K, V> multi() {
checkStoreOpen();
return new KVMultiImpl();
}
@Override
public synchronized V delete(K key) {
return delete(key, -1L);
}
synchronized V delete(K key, long revision) {
checkNotNull(key, "key cannot be null");
checkStoreOpen();
updateLastRevision(revision);
byte[] keyBytes = keyCoder.encode(key);
V val = getRaw(key, keyBytes);
putRaw(key, keyBytes, null, revision);
return val;
}
//
// Multi
//
/**
* A rocksdb based multi operation.
*/
class KVMultiImpl implements KVMulti<K, V> {
private final WriteBatch batch = new WriteBatch();
private volatile boolean executed = false;
private void checkExecuted() {
if (executed) {
throw new StateStoreRuntimeException("KVMulti#execute() has been called");
}
}
@Override
public void put(K key, V value) {
checkNotNull(key, "key cannot be null");
checkExecuted();
byte[] keyBytes = keyCoder.encode(key);
if (null == value) {
deleteRaw(keyBytes);
} else {
putRaw(keyBytes, value);
}
}
private void putRaw(byte[] keyBytes, V value) {
try {
batch.put(dataCfHandle, keyBytes, valCoder.encode(value));
} catch (RocksDBException e) {
throw new StateStoreRuntimeException(e);
}
}
@Override
public void delete(K key) {
checkNotNull(key, "key cannot be null");
checkExecuted();
byte[] keyBytes = keyCoder.encode(key);
deleteRaw(keyBytes);
}
private void deleteRaw(byte[] keyBytes) {
try {
batch.delete(dataCfHandle, keyBytes);
} catch (RocksDBException e) {
throw new StateStoreRuntimeException(e);
}
}
@Override
public void deleteRange(K from, K to) {
checkNotNull(from, "from key cannot be null");
checkNotNull(to, "to key cannot be null");
checkExecuted();
byte[] fromBytes = keyCoder.encode(from);
byte[] toBytes = keyCoder.encode(to);
try {
batch.deleteRange(dataCfHandle, fromBytes, toBytes);
} catch (RocksDBException e) {
throw new StateStoreRuntimeException(e);
}
}
@Override
public synchronized void execute() {
if (executed) {
return;
}
checkStoreOpen();
executed = true;
try {
getDb().write(writeOpts, batch);
} catch (RocksDBException e) {
throw new StateStoreRuntimeException("Error while executing a multi operation from store " + name, e);
} finally {
RocksUtils.close(batch);
}
}
}
//
// Iterators
//
/**
* KV iterator over a rocksdb instance.
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class RocksdbKVIterator implements KVIterator<K, V> {
final String name;
final RocksIterator iterator;
final Coder<K> keyCoder;
final Coder<V> valCoder;
private volatile boolean closed = false;
/**
* Ensure an iterator is open.
*
* @throws InvalidStateStoreException when the store is in closed state.
*/
private void ensureIteratorOpen() {
if (closed) {
throw new InvalidStateStoreException("Rocksdb state store " + name + " is already closed");
}
}
@Override
public void close() {
kvIters.remove(this);
iterator.close();
closed = true;
}
@Override
public boolean hasNext() {
ensureIteratorOpen();
return iterator.isValid();
}
private KV<K, V> getKvPair() {
return new KVImpl<>(keyCoder.decode(iterator.key()), valCoder.decode(iterator.value()));
}
@Override
public KV<K, V> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
KV<K, V> kv = getKvPair();
iterator.next();
return kv;
}
}
/**
* KV iterator over a rocksdb, which ends at the provided <i>endKey</i>.
*/
class RocksdbRangeIterator extends RocksdbKVIterator {
private final Comparator<byte[]> comparator = SignedBytes.lexicographicalComparator();
private final byte[] endKeyBytes;
private RocksdbRangeIterator(String name,
RocksIterator iterator,
Coder<K> keyCoder,
Coder<V> valCoder,
K endKey) {
super(name, iterator, keyCoder, valCoder);
checkNotNull(endKey, "End key cannot be null");
this.endKeyBytes = keyCoder.encode(endKey);
}
@Override
public boolean hasNext() {
return super.hasNext()
&& comparator.compare(iterator.key(), endKeyBytes) <= 0;
}
}
}