blob: bcbe37b7c31c22fb02e06ad84eea5a951903a94a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.rocksdb;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ColumnFamilyOptionsInterface;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.DBOptionsInterface;
import org.rocksdb.Env;
import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LRUCache;
import org.rocksdb.MutableColumnFamilyOptionsInterface;
import org.rocksdb.MutableDBOptionsInterface;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.SstFileManager;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.serializer.BinarySerializer;
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn;
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import org.apache.hugegraph.backend.store.BackendEntryIterator;
import org.apache.hugegraph.backend.store.rocksdb.RocksDBIteratorPool.ReusedRocksIterator;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.util.StringEncoding;
import com.google.common.collect.ImmutableList;
public class RocksDBStdSessions extends RocksDBSessions {
private static final Logger LOG = Log.logger(RocksDBStdSessions.class);
private final HugeConfig config;
private final String dataPath;
private final String walPath;
private volatile OpenedRocksDB rocksdb;
private final AtomicInteger refCount;
public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath, String walPath)
throws RocksDBException {
super(config, database, store);
this.config = config;
this.dataPath = dataPath;
this.walPath = walPath;
this.rocksdb = RocksDBStdSessions.openRocksDB(config, dataPath,
walPath);
this.refCount = new AtomicInteger(1);
}
public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath, String walPath,
List<String> cfNames) throws RocksDBException {
super(config, database, store);
this.config = config;
this.dataPath = dataPath;
this.walPath = walPath;
this.rocksdb = RocksDBStdSessions.openRocksDB(config, cfNames,
dataPath, walPath);
this.refCount = new AtomicInteger(1);
this.ingestExternalFile();
}
private RocksDBStdSessions(HugeConfig config, String database, String store,
RocksDBStdSessions origin) {
super(config, database, store);
this.config = config;
this.dataPath = origin.dataPath;
this.walPath = origin.walPath;
this.rocksdb = origin.rocksdb;
this.refCount = origin.refCount;
this.refCount.incrementAndGet();
}
@Override
public void open() throws Exception {
// pass
}
@Override
protected boolean opened() {
return this.rocksdb != null && this.rocksdb.isOwningHandle();
}
@Override
public Set<String> openedTables() {
return this.rocksdb.cfs();
}
@Override
public synchronized void createTable(String... tables)
throws RocksDBException {
this.checkValid();
List<ColumnFamilyDescriptor> cfds = new ArrayList<>();
for (String table : tables) {
if (this.rocksdb.existCf(table)) {
continue;
}
ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(
encode(table));
ColumnFamilyOptions options = cfd.getOptions();
initOptions(this.config(), null, null, options, options);
cfds.add(cfd);
}
/*
* To speed up the creation of tables, like truncate() for tinkerpop
* test, we call createColumnFamilies instead of createColumnFamily.
*/
List<ColumnFamilyHandle> cfhs = this.rocksdb().createColumnFamilies(cfds);
for (ColumnFamilyHandle cfh : cfhs) {
String table = decode(cfh.getName());
this.rocksdb.addCf(table, new OpenedRocksDB.CFHandle(this.rocksdb(), cfh));
}
this.ingestExternalFile();
}
@Override
public synchronized void dropTable(String... tables)
throws RocksDBException {
this.checkValid();
/*
* May cause bug to drop CF when someone is reading or writing this CF,
* use CFHandle to wait for others and then do drop:
* https://github.com/apache/hugegraph/issues/697
*/
List<ColumnFamilyHandle> cfhs = new ArrayList<>();
for (String table : tables) {
OpenedRocksDB.CFHandle cfh = this.rocksdb.cf(table);
if (cfh == null) {
continue;
}
cfhs.add(cfh.waitForDrop());
}
/*
* To speed up the creation of tables, like truncate() for tinkerpop
* test, we call dropColumnFamilies instead of dropColumnFamily.
*/
this.rocksdb().dropColumnFamilies(cfhs);
for (String table : tables) {
OpenedRocksDB.CFHandle cfh = this.rocksdb.cf(table);
if (cfh == null) {
continue;
}
cfh.destroy();
this.rocksdb.removeCf(table);
}
}
@Override
public boolean existsTable(String table) {
return this.rocksdb.existCf(table);
}
@Override
public void reloadRocksDB() throws RocksDBException {
if (this.rocksdb.isOwningHandle()) {
this.rocksdb.close();
}
this.rocksdb = RocksDBStdSessions.openRocksDB(this.config,
ImmutableList.of(),
this.dataPath,
this.walPath);
}
@Override
public void forceCloseRocksDB() {
this.rocksdb().close();
}
@Override
public List<String> property(String property) {
try {
if (property.equals(RocksDBMetrics.KEY_DISK_USAGE)) {
long size = this.rocksdb.totalSize();
return ImmutableList.of(String.valueOf(size));
}
List<String> values = new ArrayList<>();
for (String cf : this.openedTables()) {
try (OpenedRocksDB.CFHandle cfh = this.cf(cf)) {
values.add(this.rocksdb().getProperty(cfh.get(), property));
}
}
return values;
} catch (RocksDBException | UnsupportedOperationException e) {
throw new BackendException(e);
}
}
@Override
public void compactRange() {
try {
// Waits while compaction is performed on the background threads
// rocksdb().flush(new FlushOptions())
rocksdb().compactRange();
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBStdSessions(config, database, store, this);
}
@Override
public void createSnapshot(String snapshotPath) {
this.rocksdb.createCheckpoint(snapshotPath);
}
@Override
public void resumeSnapshot(String snapshotPath) {
File originDataDir = new File(this.dataPath);
File snapshotDir = new File(snapshotPath);
try {
/*
* Close current instance first
* NOTE: must close rocksdb instance before deleting file directory,
* if close after copying the snapshot directory to origin position,
* it may produce dirty data.
*/
this.forceCloseRocksDB();
// Delete origin data directory
if (originDataDir.exists()) {
LOG.info("Delete origin data directory {}", originDataDir);
FileUtils.deleteDirectory(originDataDir);
}
// Move snapshot directory to origin data directory
FileUtils.moveDirectory(snapshotDir, originDataDir);
LOG.info("Move snapshot directory {} to {}",
snapshotDir, originDataDir);
// Reload rocksdb instance
this.reloadRocksDB();
} catch (Exception e) {
throw new BackendException("Failed to resume snapshot '%s' to' %s'",
e, snapshotDir, this.dataPath);
}
}
@Override
public String buildSnapshotPath(String snapshotPrefix) {
// Like: parent_path/rocksdb-data/*, * can be g,m,s
Path originDataPath = Paths.get(this.dataPath);
Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent();
// Like: rocksdb-data/*
Path pureDataPath = parentParentPath.relativize(originDataPath.toAbsolutePath());
// Like: parent_path/snapshot_rocksdb-data/*
Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" +
pureDataPath);
E.checkArgument(snapshotPath.toFile().exists(),
"The snapshot path '%s' doesn't exist",
snapshotPath);
return snapshotPath.toString();
}
@Override
public String hardLinkSnapshot(String snapshotPath) throws RocksDBException {
String snapshotLinkPath = this.dataPath + "_temp";
try (OpenedRocksDB rocksdb = openRocksDB(this.config,
ImmutableList.of(),
snapshotPath, null)) {
rocksdb.createCheckpoint(snapshotLinkPath);
}
LOG.info("The snapshot {} has been hard linked to {}",
snapshotPath, snapshotLinkPath);
return snapshotLinkPath;
}
@Override
public final Session session() {
return (Session) super.getOrNewSession();
}
@Override
protected final Session newSession() {
E.checkState(this.rocksdb.isOwningHandle(),
"RocksDB has not been initialized");
return new StdSession(this.config());
}
@Override
protected synchronized void doClose() {
this.checkValid();
if (this.refCount.decrementAndGet() > 0) {
return;
}
assert this.refCount.get() == 0;
this.rocksdb.close();
}
private void checkValid() {
E.checkState(this.rocksdb.isOwningHandle(),
"It seems RocksDB has been closed");
}
private RocksDB rocksdb() {
this.checkValid();
return this.rocksdb.rocksdb();
}
private OpenedRocksDB.CFHandle cf(String cfName) {
OpenedRocksDB.CFHandle cfh = this.rocksdb.cf(cfName);
if (cfh == null) {
throw new BackendException("Table '%s' is not opened", cfName);
}
cfh.open();
return cfh;
}
private void ingestExternalFile() throws RocksDBException {
String directory = this.config().get(RocksDBOptions.SST_PATH);
if (directory == null || directory.isEmpty()) {
return;
}
RocksDBIngester ingester = new RocksDBIngester(this.rocksdb());
// Ingest all *.sst files in each directory named cf name
for (String cf : this.rocksdb.cfs()) {
Path path = Paths.get(directory, cf);
if (path.toFile().isDirectory()) {
try (OpenedRocksDB.CFHandle cfh = this.cf(cf)) {
ingester.ingest(path, cfh.get());
}
}
}
}
private static OpenedRocksDB openRocksDB(HugeConfig config,
String dataPath, String walPath)
throws RocksDBException {
// Init options
Options options = new Options();
RocksDBStdSessions.initOptions(config, options, options,
options, options);
options.setWalDir(walPath);
SstFileManager sstFileManager = new SstFileManager(Env.getDefault());
options.setSstFileManager(sstFileManager);
/*
* Open RocksDB at the first time
* Don't merge old CFs, we expect a clear DB when using this one
*/
RocksDB rocksdb = RocksDB.open(options, dataPath);
Map<String, OpenedRocksDB.CFHandle> cfs = new ConcurrentHashMap<>();
return new OpenedRocksDB(rocksdb, cfs, sstFileManager);
}
private static OpenedRocksDB openRocksDB(HugeConfig config,
List<String> cfNames,
String dataPath, String walPath)
throws RocksDBException {
// Old CFs should always be opened
Set<String> mergedCFs = RocksDBStdSessions.mergeOldCFs(dataPath,
cfNames);
List<String> cfs = ImmutableList.copyOf(mergedCFs);
// Init CFs options
List<ColumnFamilyDescriptor> cfds = new ArrayList<>(cfs.size());
for (String cf : cfs) {
ColumnFamilyDescriptor cfd = new ColumnFamilyDescriptor(encode(cf));
ColumnFamilyOptions options = cfd.getOptions();
RocksDBStdSessions.initOptions(config, null, null,
options, options);
cfds.add(cfd);
}
// Init DB options
DBOptions options = new DBOptions();
RocksDBStdSessions.initOptions(config, options, options, null, null);
if (walPath != null) {
options.setWalDir(walPath);
}
SstFileManager sstFileManager = new SstFileManager(Env.getDefault());
options.setSstFileManager(sstFileManager);
// Open RocksDB with CFs
List<ColumnFamilyHandle> cfhs = new ArrayList<>();
RocksDB rocksdb = RocksDB.open(options, dataPath, cfds, cfhs);
E.checkState(cfhs.size() == cfs.size(),
"Expect same size of cf-handles and cf-names");
// Collect CF Handles
Map<String, OpenedRocksDB.CFHandle> cfHandles = new ConcurrentHashMap<>();
for (int i = 0; i < cfs.size(); i++) {
cfHandles.put(cfs.get(i), new OpenedRocksDB.CFHandle(rocksdb, cfhs.get(i)));
}
return new OpenedRocksDB(rocksdb, cfHandles, sstFileManager);
}
private static Set<String> mergeOldCFs(String path, List<String> cfNames)
throws RocksDBException {
Set<String> cfs = listCFs(path);
cfs.addAll(cfNames);
return cfs;
}
public static Set<String> listCFs(String path) throws RocksDBException {
Set<String> cfs = new HashSet<>();
List<byte[]> oldCFs = RocksDB.listColumnFamilies(new Options(), path);
if (oldCFs.isEmpty()) {
cfs.add("default");
} else {
for (byte[] oldCF : oldCFs) {
cfs.add(decode(oldCF));
}
}
return cfs;
}
public static void initOptions(HugeConfig conf,
DBOptionsInterface<?> db,
MutableDBOptionsInterface<?> mdb,
ColumnFamilyOptionsInterface<?> cf,
MutableColumnFamilyOptionsInterface<?> mcf) {
final boolean optimize = conf.get(RocksDBOptions.OPTIMIZE_MODE);
if (db != null) {
/*
* Set true then the database will be created if it is missing.
* should we use options.setCreateMissingColumnFamilies()?
*/
db.setCreateIfMissing(true);
db.setWriteDbidToManifest(true);
db.setAvoidUnnecessaryBlockingIO(true);
// Optimize RocksDB
if (optimize) {
int processors = CoreOptions.CPUS;
db.setIncreaseParallelism(Math.max(processors / 2, 1));
db.setAllowConcurrentMemtableWrite(true);
db.setEnableWriteThreadAdaptiveYield(true);
}
db.setInfoLogLevel(InfoLogLevel.valueOf(
conf.get(RocksDBOptions.LOG_LEVEL) + "_LEVEL"));
db.setMaxSubcompactions(
conf.get(RocksDBOptions.MAX_SUB_COMPACTIONS));
db.setAllowMmapWrites(
conf.get(RocksDBOptions.ALLOW_MMAP_WRITES));
db.setAllowMmapReads(
conf.get(RocksDBOptions.ALLOW_MMAP_READS));
db.setUseDirectReads(
conf.get(RocksDBOptions.USE_DIRECT_READS));
db.setUseDirectIoForFlushAndCompaction(
conf.get(RocksDBOptions.USE_DIRECT_READS_WRITES_FC));
db.setUseFsync(conf.get(RocksDBOptions.USE_FSYNC));
db.setAtomicFlush(conf.get(RocksDBOptions.ATOMIC_FLUSH));
db.setMaxManifestFileSize(
conf.get(RocksDBOptions.MAX_MANIFEST_FILE_SIZE));
db.setSkipStatsUpdateOnDbOpen(
conf.get(RocksDBOptions.SKIP_STATS_UPDATE_ON_DB_OPEN));
db.setSkipCheckingSstFileSizesOnDbOpen(
conf.get(RocksDBOptions.SKIP_CHECK_SIZE_ON_DB_OPEN));
db.setMaxFileOpeningThreads(
conf.get(RocksDBOptions.MAX_FILE_OPENING_THREADS));
db.setDbWriteBufferSize(conf.get(RocksDBOptions.DB_MEMTABLE_SIZE));
db.setLogReadaheadSize(conf.get(RocksDBOptions.LOG_READAHEAD_SIZE));
// A global cache for table-level rows
long cacheCapacity = conf.get(RocksDBOptions.ROW_CACHE_CAPACITY);
if (cacheCapacity > 0) {
db.setRowCache(new LRUCache(cacheCapacity));
}
}
if (mdb != null) {
/*
* Migrate to max_background_jobs option
* https://github.com/facebook/rocksdb/wiki/Thread-Pool
* https://github.com/facebook/rocksdb/pull/2205/files
*/
mdb.setMaxBackgroundJobs(conf.get(RocksDBOptions.MAX_BG_JOBS));
mdb.setDelayedWriteRate(
conf.get(RocksDBOptions.DELAYED_WRITE_RATE));
mdb.setMaxOpenFiles(conf.get(RocksDBOptions.MAX_OPEN_FILES));
mdb.setMaxTotalWalSize(conf.get(RocksDBOptions.MAX_TOTAL_WAL_SIZE));
mdb.setBytesPerSync(conf.get(RocksDBOptions.BYTES_PER_SYNC));
mdb.setWalBytesPerSync(conf.get(RocksDBOptions.WAL_BYTES_PER_SYNC));
mdb.setStrictBytesPerSync(
conf.get(RocksDBOptions.STRICT_BYTES_PER_SYNC));
mdb.setCompactionReadaheadSize(
conf.get(RocksDBOptions.COMPACTION_READAHEAD_SIZE));
mdb.setDeleteObsoleteFilesPeriodMicros(1000000 *
conf.get(RocksDBOptions.DELETE_OBSOLETE_FILE_PERIOD));
}
if (cf != null) {
if (optimize) {
// Optimize RocksDB
cf.optimizeLevelStyleCompaction();
cf.optimizeUniversalStyleCompaction();
}
int numLevels = conf.get(RocksDBOptions.NUM_LEVELS);
List<CompressionType> compressions = conf.get(
RocksDBOptions.LEVELS_COMPRESSIONS);
E.checkArgument(compressions.isEmpty() ||
compressions.size() == numLevels,
"Elements number of '%s' must be 0 or " +
"be the same as '%s', but got %s != %s",
RocksDBOptions.LEVELS_COMPRESSIONS.name(),
RocksDBOptions.NUM_LEVELS.name(),
compressions.size(), numLevels);
cf.setNumLevels(numLevels);
cf.setCompactionStyle(conf.get(RocksDBOptions.COMPACTION_STYLE));
cf.setBottommostCompressionType(
conf.get(RocksDBOptions.BOTTOMMOST_COMPRESSION));
if (!compressions.isEmpty()) {
cf.setCompressionPerLevel(compressions);
}
cf.setMinWriteBufferNumberToMerge(
conf.get(RocksDBOptions.MIN_MEMTABLES_TO_MERGE));
cf.setMaxWriteBufferNumberToMaintain(
conf.get(RocksDBOptions.MAX_MEMTABLES_TO_MAINTAIN));
cf.setInplaceUpdateSupport(
conf.get(RocksDBOptions.MEMTABLE_INPLACE_UPDATE_SUPPORT));
cf.setLevelCompactionDynamicLevelBytes(
conf.get(RocksDBOptions.DYNAMIC_LEVEL_BYTES));
cf.setOptimizeFiltersForHits(
conf.get(RocksDBOptions.BLOOM_FILTERS_SKIP_LAST_LEVEL));
cf.setTableFormatConfig(initTableConfig(conf));
// CappedPrefixExtractor uses the first N bytes
int prefixLength = conf.get(RocksDBOptions.PREFIX_EXTRACTOR_CAPPED);
if (prefixLength > 0) {
cf.useCappedPrefixExtractor(prefixLength);
}
// https://github.com/facebook/rocksdb/tree/master/utilities/merge_operators
cf.setMergeOperatorName("uint64add"); // uint64add/stringappend
}
if (mcf != null) {
mcf.setCompressionType(conf.get(RocksDBOptions.COMPRESSION));
mcf.setWriteBufferSize(conf.get(RocksDBOptions.MEMTABLE_SIZE));
mcf.setMaxWriteBufferNumber(conf.get(RocksDBOptions.MAX_MEMTABLES));
mcf.setMaxBytesForLevelBase(
conf.get(RocksDBOptions.MAX_LEVEL1_BYTES));
mcf.setMaxBytesForLevelMultiplier(
conf.get(RocksDBOptions.MAX_LEVEL_BYTES_MULTIPLIER));
mcf.setTargetFileSizeBase(
conf.get(RocksDBOptions.TARGET_FILE_SIZE_BASE));
mcf.setTargetFileSizeMultiplier(
conf.get(RocksDBOptions.TARGET_FILE_SIZE_MULTIPLIER));
mcf.setLevel0FileNumCompactionTrigger(
conf.get(RocksDBOptions.LEVEL0_COMPACTION_TRIGGER));
mcf.setLevel0SlowdownWritesTrigger(
conf.get(RocksDBOptions.LEVEL0_SLOWDOWN_WRITES_TRIGGER));
mcf.setLevel0StopWritesTrigger(
conf.get(RocksDBOptions.LEVEL0_STOP_WRITES_TRIGGER));
mcf.setSoftPendingCompactionBytesLimit(
conf.get(RocksDBOptions.SOFT_PENDING_COMPACTION_LIMIT));
mcf.setHardPendingCompactionBytesLimit(
conf.get(RocksDBOptions.HARD_PENDING_COMPACTION_LIMIT));
/*
* TODO: also set memtable options:
* memtable_insert_with_hint_prefix_extractor
* The reason why use option name `memtable_bloom_size_ratio`:
* https://github.com/facebook/rocksdb/pull/9453/files
* #diff-cde52d1fcbcce2bc6aae27838f1d3e7e9e469ccad8aaf8f2695f939e279d7501R369
*/
mcf.setMemtablePrefixBloomSizeRatio(
conf.get(RocksDBOptions.MEMTABLE_BLOOM_SIZE_RATIO));
mcf.setMemtableWholeKeyFiltering(
conf.get(RocksDBOptions.MEMTABLE_BLOOM_WHOLE_KEY_FILTERING));
mcf.setMemtableHugePageSize(
conf.get(RocksDBOptions.MEMTABL_BLOOM_HUGE_PAGE_SIZE));
boolean bulkload = conf.get(RocksDBOptions.BULKLOAD_MODE);
if (bulkload) {
// Disable automatic compaction
mcf.setDisableAutoCompactions(true);
int trigger = Integer.MAX_VALUE;
mcf.setLevel0FileNumCompactionTrigger(trigger);
mcf.setLevel0SlowdownWritesTrigger(trigger);
mcf.setLevel0StopWritesTrigger(trigger);
long limit = Long.MAX_VALUE;
mcf.setSoftPendingCompactionBytesLimit(limit);
mcf.setHardPendingCompactionBytesLimit(limit);
//cf.setMemTableConfig(new VectorMemTableConfig());
}
}
}
public static TableFormatConfig initTableConfig(HugeConfig conf) {
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setFormatVersion(
conf.get(RocksDBOptions.TABLE_FORMAT_VERSION));
/*
* The index type used to lookup between data blocks:
* https://github.com/facebook/rocksdb/wiki/Index-Block-Format
*
* TODO: support more index options:
* tableConfig.setIndexShortening(IndexShorteningMode.kShortenSeparators);
* tableConfig.setEnableIndexCompression(true);
* tableConfig.setIndexBlockRestartInterval(1);
*/
tableConfig.setIndexType(conf.get(RocksDBOptions.INDEX_TYPE));
/*
* The search type of point lookup can be BinarySearch or HashSearch:
* https://github.com/facebook/rocksdb/wiki/Data-Block-Hash-Index
*/
tableConfig.setDataBlockIndexType(
conf.get(RocksDBOptions.DATA_BLOCK_SEARCH_TYPE));
tableConfig.setDataBlockHashTableUtilRatio(
conf.get(RocksDBOptions.DATA_BLOCK_HASH_TABLE_RATIO));
long blockSize = conf.get(RocksDBOptions.BLOCK_SIZE);
tableConfig.setBlockSize(blockSize);
tableConfig.setBlockSizeDeviation(
conf.get(RocksDBOptions.BLOCK_SIZE_DEVIATION));
tableConfig.setBlockRestartInterval(
conf.get(RocksDBOptions.BLOCK_RESTART_INTERVAL));
// https://github.com/facebook/rocksdb/wiki/Block-Cache
long cacheCapacity = conf.get(RocksDBOptions.BLOCK_CACHE_CAPACITY);
if (cacheCapacity <= 0L) {
// Bypassing bug https://github.com/facebook/rocksdb/pull/5465
tableConfig.setNoBlockCache(true);
} else {
tableConfig.setBlockCache(new LRUCache(cacheCapacity));
}
// https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter
int bitsPerKey = conf.get(RocksDBOptions.BLOOM_FILTER_BITS_PER_KEY);
if (bitsPerKey >= 0) {
// TODO: use space-saving RibbonFilterPolicy
boolean blockBased = conf.get(RocksDBOptions.BLOOM_FILTER_MODE);
tableConfig.setFilterPolicy(new BloomFilter(bitsPerKey,
blockBased));
tableConfig.setWholeKeyFiltering(
conf.get(RocksDBOptions.BLOOM_FILTER_WHOLE_KEY));
tableConfig.setCacheIndexAndFilterBlocks(
conf.get(RocksDBOptions.CACHE_FILTER_AND_INDEX));
tableConfig.setPinL0FilterAndIndexBlocksInCache(
conf.get(RocksDBOptions.PIN_L0_INDEX_AND_FILTER));
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
if (conf.get(RocksDBOptions.PARTITION_FILTERS_INDEXES)) {
// Enable partitioned indexes and partitioned filters
tableConfig.setPartitionFilters(true)
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setMetadataBlockSize(blockSize)
.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(
conf.get(RocksDBOptions.PIN_TOP_INDEX_AND_FILTER));
}
}
return tableConfig;
}
public static byte[] encode(String string) {
return StringEncoding.encode(string);
}
public static String decode(byte[] bytes) {
return StringEncoding.decode(bytes);
}
/**
* StdSession implement for RocksDB
*/
private final class StdSession extends RocksDBSessions.Session {
private WriteBatch batch;
private final WriteOptions writeOptions;
public StdSession(HugeConfig conf) {
this.batch = new WriteBatch();
this.writeOptions = new WriteOptions();
/*
* When work under raft mode. if store crashed, the state-machine
* can restore by snapshot + raft log, doesn't need wal and sync
*/
boolean raftMode = conf.get(CoreOptions.RAFT_MODE);
if (raftMode) {
this.writeOptions.setDisableWAL(true);
this.writeOptions.setSync(false);
}
}
@Override
public void open() {
this.opened = true;
}
@Override
public void close() {
assert this.closeable();
this.opened = false;
}
@Override
public boolean closed() {
return !this.opened || !RocksDBStdSessions.this.opened();
}
@Override
public void reset() {
this.batch = new WriteBatch();
}
/**
* Any change in the session
*/
@Override
public boolean hasChanges() {
return this.batch.count() > 0;
}
@Override
public String dataPath() {
return RocksDBStdSessions.this.dataPath;
}
@Override
public String walPath() {
return RocksDBStdSessions.this.walPath;
}
/**
* Get property value by name from specified table
*/
@Override
public String property(String table, String property) {
try (OpenedRocksDB.CFHandle cf = cf(table)) {
return rocksdb().getProperty(cf.get(), property);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
@Override
public Pair<byte[], byte[]> keyRange(String table) {
byte[] startKey;
byte[] endKey;
try (OpenedRocksDB.CFHandle cf = cf(table);
RocksIterator iter = rocksdb().newIterator(cf.get())) {
iter.seekToFirst();
if (!iter.isValid()) {
return null;
}
startKey = iter.key();
iter.seekToLast();
if (!iter.isValid()) {
return Pair.of(startKey, null);
}
endKey = iter.key();
}
return Pair.of(startKey, endKey);
}
@Override
public void compactRange(String table) {
try (OpenedRocksDB.CFHandle cf = cf(table)) {
// Waits while compaction is performed on the background threads
rocksdb().compactRange(cf.get());
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Commit all updates(put/delete) to DB
*/
@Override
public Integer commit() {
int count = this.batch.count();
if (count <= 0) {
return 0;
}
try {
rocksdb().write(this.writeOptions, this.batch);
} catch (RocksDBException e) {
//this.batch.rollbackToSavePoint();
throw new BackendException(e);
}
// Clear batch if write() successfully (retained if failed)
this.batch.clear();
return count;
}
/**
* Rollback all updates(put/delete) not committed
*/
@Override
public void rollback() {
this.batch.clear();
}
/**
* Add a KV record to a table
*/
@Override
public void put(String table, byte[] key, byte[] value) {
try (OpenedRocksDB.CFHandle cf = cf(table)) {
this.batch.put(cf.get(), key, value);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Merge a record to an existing key to a table
* For more details about merge-operator:
* https://github.com/facebook/rocksdb/wiki/merge-operator
*/
@Override
public void merge(String table, byte[] key, byte[] value) {
try (OpenedRocksDB.CFHandle cf = cf(table)) {
this.batch.merge(cf.get(), key, value);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Merge a record to an existing key to a table and commit immediately
*/
@Override
public void increase(String table, byte[] key, byte[] value) {
try (OpenedRocksDB.CFHandle cf = cf(table)) {
rocksdb().merge(cf.get(), key, value);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Delete a record by key from a table
*/
@Override
public void delete(String table, byte[] key) {
try (OpenedRocksDB.CFHandle cf = cf(table)) {
this.batch.delete(cf.get(), key);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Delete the only one version of a record by key from a table
* NOTE: requires that the key exists and was not overwritten.
*/
@Override
public void deleteSingle(String table, byte[] key) {
try (OpenedRocksDB.CFHandle cf = cf(table)) {
this.batch.singleDelete(cf.get(), key);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Delete a record by key(or prefix with key) from a table
*/
@Override
public void deletePrefix(String table, byte[] key) {
byte[] keyFrom = key;
byte[] keyTo = Arrays.copyOf(keyFrom, keyFrom.length);
BinarySerializer.increaseOne(keyTo);
try (OpenedRocksDB.CFHandle cf = cf(table)) {
this.batch.deleteRange(cf.get(), keyFrom, keyTo);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Delete a range of keys from a table
*/
@Override
public void deleteRange(String table, byte[] keyFrom, byte[] keyTo) {
try (OpenedRocksDB.CFHandle cf = cf(table)) {
this.batch.deleteRange(cf.get(), keyFrom, keyTo);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Get a record by key from a table
*/
@Override
public byte[] get(String table, byte[] key) {
assert !this.hasChanges();
try (OpenedRocksDB.CFHandle cf = cf(table)) {
return rocksdb().get(cf.get(), key);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Get records by a list of keys from a table
*/
@Override
public BackendColumnIterator get(String table, List<byte[]> keys) {
assert !this.hasChanges();
try (OpenedRocksDB.CFHandle cf = cf(table)) {
// Fill ColumnFamilyHandle list
List<ColumnFamilyHandle> cfs = new ArrayList<>(keys.size());
ColumnFamilyHandle cfh = cf.get();
for (int i = 0; i < keys.size(); i++) {
cfs.add(cfh);
}
/*
* Do multi-get
* NOTE: the multiGetAsList() is just for consistent version,
* the batching version with io_uring support for performance
* is not ready, see #9224
*/
List<byte[]> values = rocksdb().multiGetAsList(cfs, keys);
return new MgetIterator(keys, values);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}
/**
* Scan all records from a table
*/
@Override
public BackendColumnIterator scan(String table) {
assert !this.hasChanges();
try (OpenedRocksDB.CFHandle cf = cf(table)) {
ReusedRocksIterator iter = cf.newIterator();
return new ScanIterator(table, iter, null, null, SCAN_ANY);
}
}
/**
* Scan records by key prefix from a table
*/
@Override
public BackendColumnIterator scan(String table, byte[] prefix) {
assert !this.hasChanges();
/*
* NOTE: Options.prefix_extractor is a prerequisite for
* optimized prefix seek, if Options.prefix_extractor if enabled,
* can setPrefixSameAsStart(true) or setAutoPrefixMode(true):
* ReadOptions options = new ReadOptions();
* options.setPrefixSameAsStart(true);
* or
* options.setAutoPrefixMode(true);
* options.setIterateUpperBound(prefix + 1);
*/
try (OpenedRocksDB.CFHandle cf = cf(table)) {
ReusedRocksIterator iter = cf.newIterator();
return new ScanIterator(table, iter, prefix, null,
SCAN_PREFIX_BEGIN);
}
}
/**
* Scan records by key range from a table
*/
@Override
public BackendColumnIterator scan(String table, byte[] keyFrom,
byte[] keyTo, int scanType) {
assert !this.hasChanges();
/*
* NOTE: if Options.prefix_extractor if enabled, need to
* setTotalOrderSeek(true) or setAutoPrefixMode(true) to make
* page-seek or shard-scan return right results:
* ReadOptions options = new ReadOptions();
* options.setTotalOrderSeek(true);
* or
* options.setAutoPrefixMode(true);
* options.setIterateUpperBound(keyTo);
*/
try (OpenedRocksDB.CFHandle cf = cf(table)) {
ReusedRocksIterator iter = cf.newIterator();
return new ScanIterator(table, iter, keyFrom, keyTo, scanType);
}
}
}
/**
* A wrapper for RocksIterator that convert RocksDB results to std Iterator
*/
private static class ScanIterator implements BackendColumnIterator,
Countable {
private final String table;
private final ReusedRocksIterator reusedIter;
private final RocksIterator iter;
private final byte[] keyBegin;
private final byte[] keyEnd;
private final int scanType;
private byte[] position;
private boolean matched;
public ScanIterator(String table, ReusedRocksIterator reusedIter,
byte[] keyBegin, byte[] keyEnd, int scanType) {
E.checkNotNull(reusedIter, "reusedIter");
this.table = table;
this.reusedIter = reusedIter;
this.iter = reusedIter.iterator();
this.keyBegin = keyBegin;
this.keyEnd = keyEnd;
this.scanType = scanType;
this.position = keyBegin;
this.matched = false;
this.checkArguments();
//this.dump();
this.seek();
}
private void checkArguments() {
E.checkArgument(!(this.match(Session.SCAN_PREFIX_BEGIN) &&
this.match(Session.SCAN_PREFIX_END)),
"Can't set SCAN_PREFIX_WITH_BEGIN and " +
"SCAN_PREFIX_WITH_END at the same time");
E.checkArgument(!(this.match(Session.SCAN_PREFIX_BEGIN) &&
this.match(Session.SCAN_GT_BEGIN)),
"Can't set SCAN_PREFIX_WITH_BEGIN and " +
"SCAN_GT_BEGIN/SCAN_GTE_BEGIN at the same time");
E.checkArgument(!(this.match(Session.SCAN_PREFIX_END) &&
this.match(Session.SCAN_LT_END)),
"Can't set SCAN_PREFIX_WITH_END and " +
"SCAN_LT_END/SCAN_LTE_END at the same time");
if (this.match(Session.SCAN_PREFIX_BEGIN)) {
E.checkArgument(this.keyBegin != null,
"Parameter `keyBegin` can't be null " +
"if set SCAN_PREFIX_WITH_BEGIN");
E.checkArgument(this.keyEnd == null,
"Parameter `keyEnd` must be null " +
"if set SCAN_PREFIX_WITH_BEGIN");
}
if (this.match(Session.SCAN_PREFIX_END)) {
E.checkArgument(this.keyEnd != null,
"Parameter `keyEnd` can't be null " +
"if set SCAN_PREFIX_WITH_END");
}
if (this.match(Session.SCAN_GT_BEGIN)) {
E.checkArgument(this.keyBegin != null,
"Parameter `keyBegin` can't be null " +
"if set SCAN_GT_BEGIN or SCAN_GTE_BEGIN");
}
if (this.match(Session.SCAN_LT_END)) {
E.checkArgument(this.keyEnd != null,
"Parameter `keyEnd` can't be null " +
"if set SCAN_LT_END or SCAN_LTE_END");
}
}
private boolean match(int expected) {
return Session.matchScanType(expected, this.scanType);
}
/**
* Just for debug
*/
@SuppressWarnings("unused")
private void dump() {
this.seek();
LOG.info(">>>> scan from {}: {}{}",
this.table,
this.keyBegin == null ? "*" : StringEncoding.format(this.keyBegin),
this.iter.isValid() ? "" : " - No data");
for (; this.iter.isValid(); this.iter.next()) {
LOG.info("{}={}",
StringEncoding.format(this.iter.key()),
StringEncoding.format(this.iter.value()));
}
}
@Override
public boolean hasNext() {
this.matched = this.iter.isOwningHandle();
if (!this.matched) {
// Maybe closed
return this.matched;
}
this.matched = this.iter.isValid();
if (this.matched) {
// Update position for paging
this.position = this.iter.key();
// Do filter if not SCAN_ANY
if (!this.match(Session.SCAN_ANY)) {
this.matched = this.filter(this.position);
}
}
if (!this.matched) {
// The end
this.position = null;
// Free the iterator if finished
this.close();
}
return this.matched;
}
private void seek() {
if (this.keyBegin == null || this.keyBegin.length <= 0) {
// Seek to the first if no `keyBegin`
this.iter.seekToFirst();
} else {
/*
* Seek to `keyBegin`:
* if set SCAN_GT_BEGIN/SCAN_GTE_BEGIN (key > / >= 'xx')
* or if set SCAN_PREFIX_WITH_BEGIN (key prefix with 'xx')
*/
this.iter.seek(this.keyBegin);
// Skip `keyBegin` if set SCAN_GT_BEGIN (key > 'xx')
if (this.match(Session.SCAN_GT_BEGIN) &&
!this.match(Session.SCAN_GTE_BEGIN)) {
while (this.iter.isValid() &&
Bytes.equals(this.iter.key(), this.keyBegin)) {
this.iter.next();
}
}
}
}
private boolean filter(byte[] key) {
if (this.match(Session.SCAN_PREFIX_BEGIN)) {
/*
* Prefix with `keyBegin`?
* TODO: use custom prefix_extractor instead
* or use ReadOptions.prefix_same_as_start
*/
return Bytes.prefixWith(key, this.keyBegin);
} else if (this.match(Session.SCAN_PREFIX_END)) {
/*
* Prefix with `keyEnd`?
* like the following query for range index:
* key > 'age:20' and prefix with 'age'
*/
assert this.keyEnd != null;
return Bytes.prefixWith(key, this.keyEnd);
} else if (this.match(Session.SCAN_LT_END)) {
/*
* Less (equal) than `keyEnd`?
* NOTE: don't use BytewiseComparator due to signed byte
*/
assert this.keyEnd != null;
if (this.match(Session.SCAN_LTE_END)) {
// Just compare the prefix, can be there are excess tail
key = Arrays.copyOfRange(key, 0, this.keyEnd.length);
return Bytes.compare(key, this.keyEnd) <= 0;
} else {
return Bytes.compare(key, this.keyEnd) < 0;
}
} else {
assert this.match(Session.SCAN_ANY) ||
this.match(Session.SCAN_GT_BEGIN) ||
this.match(Session.SCAN_GTE_BEGIN) :
"Unknow scan type";
return true;
}
}
@Override
public BackendColumn next() {
if (!this.matched) {
if (!this.hasNext()) {
throw new NoSuchElementException();
}
}
BackendColumn col = BackendColumn.of(this.iter.key(),
this.iter.value());
this.iter.next();
this.matched = false;
return col;
}
@Override
public long count() {
long count = 0L;
while (this.hasNext()) {
this.iter.next();
this.matched = false;
count++;
BackendEntryIterator.checkInterrupted();
}
return count;
}
@Override
public byte[] position() {
return this.position;
}
@Override
public void close() {
this.reusedIter.close();
}
}
private static class MgetIterator implements BackendColumnIterator {
private final List<byte[]> keys;
private final List<byte[]> values;
private int current;
private byte[] currentValue;
public MgetIterator(List<byte[]> keys, List<byte[]> values) {
E.checkNotEmpty(keys, "keys");
E.checkNotEmpty(values, "values");
E.checkArgument(keys.size() == values.size(),
"Expect the same size between keys and values");
this.keys = keys;
this.values = values;
this.current = 0;
this.currentValue = null;
}
@Override
public void close() {
// pass
}
@Override
public byte[] position() {
return null;
}
@Override
public boolean hasNext() {
for (; this.current < this.values.size(); this.current++) {
this.currentValue = this.values.get(this.current);
if (this.currentValue != null) {
return true;
}
}
return false;
}
@Override
public BackendColumn next() {
if (this.currentValue == null) {
if (!this.hasNext()) {
throw new NoSuchElementException();
}
}
byte[] key = this.keys.get(this.current++);
byte[] value = this.currentValue;
this.currentValue = null;
return BackendColumn.of(key, value);
}
}
}