blob: 2dba5fa7666589b87c16b9ff758720995c17925c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.rocksdb;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.MergeIterator;
import org.apache.hugegraph.backend.store.AbstractBackendStore;
import org.apache.hugegraph.backend.store.BackendAction;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendFeatures;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendSessionPool;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.BackendTable;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.exception.ConnectionException;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.util.Consumers;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
import com.google.common.collect.ImmutableList;
public abstract class RocksDBStore extends AbstractBackendStore<RocksDBSessions.Session> {
private static final Logger LOG = Log.logger(RocksDBStore.class);
private static final BackendFeatures FEATURES = new RocksDBFeatures();
private final String store;
private final String database;
private final BackendStoreProvider provider;
private final Map<HugeType, RocksDBTable> tables;
private final Map<String, RocksDBTable> olapTables;
private String dataPath;
private RocksDBSessions sessions;
private final Map<HugeType, String> tableDiskMapping;
// DataPath:RocksDB mapping
private final ConcurrentMap<String, RocksDBSessions> dbs;
private final ReadWriteLock storeLock;
private static final String TABLE_GENERAL_KEY = "general";
private static final String DB_OPEN = "db-open-%s";
private static final long OPEN_TIMEOUT = 600L;
/*
* This is threads number used to concurrently opening RocksDB dbs,
* 8 is supposed enough due to configurable data disks and
* disk number of one machine
*/
private static final int OPEN_POOL_THREADS = 8;
private boolean isGraphStore;
public RocksDBStore(final BackendStoreProvider provider,
final String database, final String store) {
this.tables = new HashMap<>();
this.olapTables = new HashMap<>();
this.provider = provider;
this.database = database;
this.store = store;
this.sessions = null;
this.tableDiskMapping = new HashMap<>();
this.dbs = new ConcurrentHashMap<>();
this.storeLock = new ReentrantReadWriteLock();
this.registerMetaHandlers();
}
private void registerMetaHandlers() {
Supplier<List<RocksDBSessions>> dbsGet = () -> {
List<RocksDBSessions> dbs = new ArrayList<>();
dbs.add(this.sessions);
dbs.addAll(this.tableDBMapping().values());
return dbs;
};
this.registerMetaHandler("metrics", (session, meta, args) -> {
RocksDBMetrics metrics = new RocksDBMetrics(dbsGet.get(), session);
return metrics.metrics();
});
this.registerMetaHandler("compact", (session, meta, args) -> {
RocksDBMetrics metrics = new RocksDBMetrics(dbsGet.get(), session);
return metrics.compact();
});
}
protected void registerTableManager(HugeType type, RocksDBTable table) {
this.tables.put(type, table);
}
protected void registerTableManager(String name, RocksDBTable table) {
this.olapTables.put(name, table);
}
protected void unregisterTableManager(String name) {
this.olapTables.remove(name);
}
@Override
protected final RocksDBTable table(HugeType type) {
RocksDBTable table = this.tables.get(type);
if (table == null) {
throw new BackendException("Unsupported table: '%s'", type);
}
return table;
}
protected final RocksDBTable table(String name) {
RocksDBTable table = this.olapTables.get(name);
if (table == null) {
throw new BackendException("Unsupported table: '%s'", name);
}
return table;
}
protected List<String> tableNames() {
List<String> tables = this.tables.values().stream()
.map(BackendTable::table)
.collect(Collectors.toList());
tables.addAll(this.olapTables());
return tables;
}
protected List<String> olapTables() {
return this.olapTables.values().stream().map(RocksDBTable::table)
.collect(Collectors.toList());
}
protected List<String> tableNames(HugeType type) {
return type != HugeType.OLAP ? Collections.singletonList(this.table(type).table()) :
this.olapTables();
}
@Override
public String store() {
return this.store;
}
@Override
public String database() {
return this.database;
}
@Override
public BackendStoreProvider provider() {
return this.provider;
}
@Override
public BackendFeatures features() {
return FEATURES;
}
@Override
public synchronized void open(HugeConfig config) {
LOG.debug("Store open: {}", this.store);
E.checkNotNull(config, "config");
String graphStore = config.get(CoreOptions.STORE_GRAPH);
this.isGraphStore = this.store.equals(graphStore);
this.dataPath = config.get(RocksDBOptions.DATA_PATH);
if (this.sessions != null && !this.sessions.closed()) {
LOG.debug("Store {} has been opened before", this.store);
this.useSessions();
return;
}
List<Future<?>> futures = new ArrayList<>();
ExecutorService openPool = ExecutorUtil.newFixedThreadPool(
OPEN_POOL_THREADS, DB_OPEN);
// Open base disk
futures.add(openPool.submit(() -> {
this.sessions = this.open(config, this.tableNames());
}));
// Open tables with optimized disk
Map<String, String> disks = config.getMap(RocksDBOptions.DATA_DISKS);
Set<String> openedDisks = new HashSet<>();
if (!disks.isEmpty()) {
this.parseTableDiskMapping(disks, this.dataPath);
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String disk = e.getValue();
if (openedDisks.contains(disk)) {
continue;
}
openedDisks.add(disk);
List<String> tables = this.tableNames(e.getKey());
futures.add(openPool.submit(() -> {
this.open(config, disk, disk, tables);
}));
}
}
try {
this.waitOpenFinished(futures);
} finally {
this.shutdownOpenPool(openPool);
}
}
private void waitOpenFinished(List<Future<?>> futures) {
for (Future<?> future : futures) {
try {
future.get();
} catch (Throwable e) {
if (e.getCause() instanceof ConnectionException) {
throw new ConnectionException("Failed to open RocksDB store", e);
}
throw new BackendException("Failed to open RocksDB store", e);
}
}
}
private void shutdownOpenPool(ExecutorService openPool) {
if (openPool.isShutdown()) {
return;
}
/*
* Transfer the session holder from db-open thread to main thread,
* otherwise once the db-open thread pool is closed, we can no longer
* close the session created by it, which will cause the rocksdb
* instance fail to close
*/
this.useSessions();
try {
Consumers.executeOncePerThread(openPool, OPEN_POOL_THREADS,
this::closeSessions);
} catch (InterruptedException e) {
throw new BackendException("Failed to close session opened by " +
"open-pool");
}
boolean terminated;
openPool.shutdown();
try {
terminated = openPool.awaitTermination(OPEN_TIMEOUT,
TimeUnit.SECONDS);
} catch (Throwable e) {
throw new BackendException(
"Failed to wait db-open thread pool shutdown", e);
}
if (!terminated) {
LOG.warn("Timeout when waiting db-open thread pool shutdown");
}
openPool.shutdownNow();
}
protected RocksDBSessions open(HugeConfig config, List<String> tableNames) {
String dataPath = this.wrapPath(config.get(RocksDBOptions.DATA_PATH));
String walPath = this.wrapPath(config.get(RocksDBOptions.WAL_PATH));
return this.open(config, dataPath, walPath, tableNames);
}
protected RocksDBSessions open(HugeConfig config, String dataPath,
String walPath, List<String> tableNames) {
LOG.info("Opening RocksDB with data path: {}", dataPath);
RocksDBSessions sessions = null;
try {
sessions = this.openSessionPool(config, dataPath,
walPath, tableNames);
} catch (RocksDBException e) {
RocksDBSessions origin = this.dbs.get(dataPath);
if (origin != null) {
if (e.getMessage().contains("No locks available")) {
/*
* Open twice, copy a RocksDBSessions reference, since from
* v0.11.2 release we don't support multi graphs share
* rocksdb instance (before v0.11.2 graphs with different
* CF-prefix share one rocksdb instance and data path),
* so each graph has its independent data paths, but multi
* CFs may share same optimized disk(or optimized disk path).
*/
sessions = origin.copy(config, this.database, this.store);
}
}
if (e.getMessage().contains("Column family not found")) {
if (this.isSchemaStore()) {
LOG.info("Failed to open RocksDB '{}' with database '{}'," +
" try to init CF later", dataPath, this.database);
}
List<String> none;
boolean existsOtherKeyspace = existsOtherKeyspace(dataPath);
if (existsOtherKeyspace) {
// Open a keyspace after other keyspace closed
// Set to empty list to open old CFs(of other keyspace)
none = ImmutableList.of();
} else {
// Before init the first keyspace
none = null;
}
try {
sessions = this.openSessionPool(config, dataPath,
walPath, none);
} catch (RocksDBException e1) {
e = e1;
}
if (sessions == null && !existsOtherKeyspace) {
LOG.error("Failed to open RocksDB with default CF, " +
"is there data for other programs: {}", dataPath);
}
}
if (sessions == null) {
// Error after trying other ways
LOG.error("Failed to open RocksDB '{}'", dataPath, e);
throw new ConnectionException("Failed to open RocksDB '%s'",
e, dataPath);
}
}
if (sessions != null) {
// May override the original session pool
this.dbs.put(dataPath, sessions);
sessions.session().open();
LOG.debug("Store opened: {}", dataPath);
}
return sessions;
}
protected RocksDBSessions openSessionPool(HugeConfig config,
String dataPath, String walPath,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath);
} else {
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath, tableNames);
}
}
protected String wrapPath(String path) {
// Ensure the `path` exists
try {
FileUtils.forceMkdir(FileUtils.getFile(path));
} catch (IOException e) {
throw new BackendException(e.getMessage(), e);
}
// Join with store type
return Paths.get(path, this.store).toString();
}
protected Map<String, RocksDBSessions> tableDBMapping() {
Map<String, RocksDBSessions> tableDBMap = InsertionOrderUtil.newMap();
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
HugeType type = e.getKey();
RocksDBSessions db = this.db(e.getValue());
String key = type != HugeType.OLAP ? this.table(type).table() :
type.string();
tableDBMap.put(key, db);
}
return tableDBMap;
}
protected ReadWriteLock storeLock() {
return this.storeLock;
}
@Override
public void close() {
LOG.debug("Store close: {}", this.store);
this.checkOpened();
this.closeSessions();
}
@Override
public boolean opened() {
this.checkDbOpened();
return this.sessions.session().opened();
}
@Override
public void mutate(BackendMutation mutation) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
this.checkOpened();
if (LOG.isDebugEnabled()) {
LOG.debug("Store {} mutation: {}", this.store, mutation);
}
for (HugeType type : mutation.types()) {
RocksDBSessions.Session session = this.session(type);
for (Iterator<BackendAction> it = mutation.mutation(type);
it.hasNext();) {
this.mutate(session, it.next());
}
}
} finally {
readLock.unlock();
}
}
private void mutate(RocksDBSessions.Session session, BackendAction item) {
BackendEntry entry = item.entry();
RocksDBTable table;
if (!entry.olap()) {
// Oltp table
table = this.table(entry.type());
} else {
if (entry.type().isIndex()) {
// Olap index
table = this.table(this.olapTableName(entry.type()));
} else {
// Olap vertex
table = this.table(this.olapTableName(entry.subId()));
}
session = this.session(HugeType.OLAP);
}
switch (item.action()) {
case INSERT:
table.insert(session, entry);
break;
case DELETE:
table.delete(session, entry);
break;
case APPEND:
table.append(session, entry);
break;
case ELIMINATE:
table.eliminate(session, entry);
break;
case UPDATE_IF_PRESENT:
table.updateIfPresent(session, entry);
break;
case UPDATE_IF_ABSENT:
table.updateIfAbsent(session, entry);
break;
default:
throw new AssertionError(String.format(
"Unsupported mutate action: %s", item.action()));
}
}
@Override
public Iterator<BackendEntry> query(Query query) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
this.checkOpened();
HugeType tableType = RocksDBTable.tableType(query);
RocksDBTable table;
RocksDBSessions.Session session;
if (query.olap()) {
table = this.table(this.olapTableName(tableType));
session = this.session(HugeType.OLAP);
} else {
table = this.table(tableType);
session = this.session(tableType);
}
Iterator<BackendEntry> entries = table.query(session, query);
// Merge olap results as needed
Set<Id> olapPks = query.olapPks();
if (this.isGraphStore && !olapPks.isEmpty()) {
List<Iterator<BackendEntry>> iterators = new ArrayList<>();
for (Id pk : olapPks) {
Query q = query.copy();
table = this.table(this.olapTableName(pk));
iterators.add(table.query(this.session(HugeType.OLAP), q));
}
entries = new MergeIterator<>(entries, iterators,
BackendEntry::mergeable);
}
return entries;
} finally {
readLock.unlock();
}
}
@Override
public Number queryNumber(Query query) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
this.checkOpened();
HugeType tableType = RocksDBTable.tableType(query);
RocksDBTable table = this.table(tableType);
return table.queryNumber(this.session(tableType), query);
} finally {
readLock.unlock();
}
}
@Override
public synchronized void init() {
Lock writeLock = this.storeLock.writeLock();
writeLock.lock();
try {
this.checkDbOpened();
// Create tables with main disk
this.createTable(this.sessions,
this.tableNames().toArray(new String[0]));
// Create table with optimized disk
Map<String, RocksDBSessions> tableDBMap = this.tableDBMapping();
for (Map.Entry<String, RocksDBSessions> e : tableDBMap.entrySet()) {
if (e.getKey().equals(HugeType.OLAP.string())) {
for (String olapTable : this.olapTables()) {
this.createTable(e.getValue(), olapTable);
}
} else {
this.createTable(e.getValue(), e.getKey());
}
}
LOG.debug("Store initialized: {}", this.store);
} finally {
writeLock.unlock();
}
}
protected void createTable(RocksDBSessions db, String... tables) {
try {
db.createTable(tables);
} catch (RocksDBException e) {
throw new BackendException("Failed to create tables %s for '%s'",
e, Arrays.asList(tables), this.store);
}
}
@Override
public synchronized void clear(boolean clearSpace) {
Lock writeLock = this.storeLock.writeLock();
writeLock.lock();
try {
this.checkDbOpened();
// Drop tables with main disk
this.dropTable(this.sessions,
this.tableNames().toArray(new String[0]));
// Drop tables with optimized disk
Map<String, RocksDBSessions> tableDBMap = this.tableDBMapping();
for (Map.Entry<String, RocksDBSessions> e : tableDBMap.entrySet()) {
if (e.getKey().equals(HugeType.OLAP.string())) {
for (String olapTable : this.olapTables()) {
this.dropTable(e.getValue(), olapTable);
}
} else {
this.dropTable(e.getValue(), e.getKey());
}
}
LOG.debug("Store cleared: {}", this.store);
} finally {
writeLock.unlock();
}
}
protected void dropTable(RocksDBSessions db, String... tables) {
try {
db.dropTable(tables);
} catch (BackendException e) {
if (e.getMessage().contains("is not opened")) {
return;
}
throw e;
} catch (RocksDBException e) {
throw new BackendException("Failed to drop tables %s for '%s'",
e, Arrays.asList(tables), this.store);
}
}
@Override
public boolean initialized() {
this.checkDbOpened();
if (!this.opened()) {
return false;
}
for (String table : this.tableNames()) {
if (!this.sessions.existsTable(table)) {
return false;
}
}
return true;
}
@Override
public synchronized void truncate() {
Lock writeLock = this.storeLock.writeLock();
writeLock.lock();
try {
this.checkOpened();
this.clear(false);
this.init();
// Clear write-batch
this.dbs.values().forEach(BackendSessionPool::forceResetSessions);
LOG.debug("Store truncated: {}", this.store);
} finally {
writeLock.unlock();
}
}
@Override
public void beginTx() {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
this.checkOpened();
for (RocksDBSessions.Session session : this.session()) {
assert !session.hasChanges();
}
} finally {
readLock.unlock();
}
}
@Override
public void commitTx() {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
this.checkOpened();
// Unable to guarantee atomicity when committing multi sessions
for (RocksDBSessions.Session session : this.session()) {
Object count = session.commit();
if (LOG.isDebugEnabled()) {
LOG.debug("Store {} committed {} items", this.store, count);
}
}
} finally {
readLock.unlock();
}
}
@Override
public void rollbackTx() {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
this.checkOpened();
for (RocksDBSessions.Session session : this.session()) {
session.rollback();
}
} finally {
readLock.unlock();
}
}
@Override
protected RocksDBSessions.Session session(HugeType tableType) {
this.checkOpened();
// Optimized disk
String disk = this.tableDiskMapping.get(tableType);
if (disk != null) {
return this.db(disk).session();
}
return this.sessions.session();
}
@Override
public Map<String, String> createSnapshot(String snapshotPrefix) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
Map<String, String> uniqueSnapshotDirMaps = new HashMap<>();
// Every rocksdb instance should create an snapshot
for (Map.Entry<String, RocksDBSessions> entry : this.dbs.entrySet()) {
// Like: parent_path/rocksdb-data/*, * maybe g,m,s
Path originDataPath = Paths.get(entry.getKey()).toAbsolutePath();
Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent();
// Like: rocksdb-data/*
Path pureDataPath = parentParentPath.relativize(originDataPath);
// Like: parent_path/snapshot_rocksdb-data/*
Path snapshotPath = parentParentPath.resolve(snapshotPrefix +
"_" + pureDataPath);
LOG.debug("Create snapshot '{}' for origin data path '{}'",
snapshotPath, originDataPath);
RocksDBSessions sessions = entry.getValue();
sessions.createSnapshot(snapshotPath.toString());
String snapshotDir = snapshotPath.toAbsolutePath().getParent().toString();
// Find correspond data HugeType key
String diskTableKey = this.findDiskTableKeyByPath(
entry.getKey());
uniqueSnapshotDirMaps.put(snapshotDir, diskTableKey);
}
LOG.info("The store '{}' create snapshot successfully", this);
return uniqueSnapshotDirMaps;
} finally {
readLock.unlock();
}
}
@Override
public void resumeSnapshot(String snapshotPrefix, boolean deleteSnapshot) {
Lock readLock = this.storeLock.readLock();
readLock.lock();
try {
if (!this.opened()) {
return;
}
Map<String, RocksDBSessions> snapshotPaths = new HashMap<>();
for (Map.Entry<String, RocksDBSessions> entry : this.dbs.entrySet()) {
RocksDBSessions sessions = entry.getValue();
String snapshotPath = sessions.buildSnapshotPath(snapshotPrefix);
LOG.debug("The origin data path: {}", entry.getKey());
if (!deleteSnapshot) {
snapshotPath = sessions.hardLinkSnapshot(snapshotPath);
}
LOG.debug("The snapshot data path: {}", snapshotPath);
snapshotPaths.put(snapshotPath, sessions);
}
for (Map.Entry<String, RocksDBSessions> entry :
snapshotPaths.entrySet()) {
String snapshotPath = entry.getKey();
RocksDBSessions sessions = entry.getValue();
sessions.resumeSnapshot(snapshotPath);
if (deleteSnapshot) {
// Delete empty snapshot parent directory
Path parentPath = Paths.get(snapshotPath).toAbsolutePath().getParent();
if (Files.list(parentPath).count() == 0) {
FileUtils.deleteDirectory(parentPath.toFile());
}
}
}
LOG.info("The store '{}' resume snapshot successfully", this);
} catch (RocksDBException | IOException e) {
throw new BackendException("Failed to resume snapshot", e);
} finally {
readLock.unlock();
}
}
private void useSessions() {
for (RocksDBSessions sessions : this.sessions()) {
sessions.useSession();
}
}
private List<RocksDBSessions.Session> session() {
this.checkOpened();
if (this.tableDiskMapping.isEmpty()) {
return Collections.singletonList(this.sessions.session());
}
// Collect session of each table with optimized disk
List<RocksDBSessions.Session> list = new ArrayList<>(this.tableDiskMapping.size() + 1);
list.add(this.sessions.session());
for (String disk : this.tableDiskMapping.values()) {
list.add(db(disk).session());
}
return list;
}
private void closeSessions() {
Iterator<Map.Entry<String, RocksDBSessions>> iter = this.dbs.entrySet()
.iterator();
while (iter.hasNext()) {
Map.Entry<String, RocksDBSessions> entry = iter.next();
RocksDBSessions sessions = entry.getValue();
boolean closed = sessions.close();
if (closed) {
iter.remove();
}
}
}
private Collection<RocksDBSessions> sessions() {
return this.dbs.values();
}
private void parseTableDiskMapping(Map<String, String> disks,
String dataPath) {
this.tableDiskMapping.clear();
for (Map.Entry<String, String> disk : disks.entrySet()) {
// The format of `disk` like: `graph/vertex: /path/to/disk1`
String name = disk.getKey();
String path = disk.getValue();
E.checkArgument(!dataPath.equals(path), "Invalid disk path" +
"(can't be the same as data_path): '%s'", path);
E.checkArgument(!name.isEmpty() && !path.isEmpty(),
"Invalid disk format: '%s', expect `NAME:PATH`",
disk);
String[] pair = name.split("/", 2);
E.checkArgument(pair.length == 2,
"Invalid disk key format: '%s', " +
"expect `STORE/TABLE`", name);
String store = pair[0].trim();
HugeType table = HugeType.valueOf(pair[1].trim().toUpperCase());
if (this.store.equals(store)) {
path = this.wrapPath(path);
this.tableDiskMapping.put(table, path);
}
}
}
@SuppressWarnings("unused")
private Map<String, String> reportDiskMapping() {
Map<String, String> diskMapping = new HashMap<>();
diskMapping.put(TABLE_GENERAL_KEY, this.dataPath);
for (Map.Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String key = this.store + "/" + e.getKey().name();
String value = Paths.get(e.getValue()).toAbsolutePath().getParent().toString();
diskMapping.put(key, value);
}
return diskMapping;
}
private String findDiskTableKeyByPath(String diskPath) {
String diskTableKey = TABLE_GENERAL_KEY;
for (Map.Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
if (diskPath.equals(e.getValue())) {
diskTableKey = this.store + "/" + e.getKey().name();
break;
}
}
return diskTableKey;
}
private void checkDbOpened() {
E.checkState(this.sessions != null && !this.sessions.closed(),
"RocksDB has not been opened");
}
protected RocksDBSessions db(HugeType tableType) {
this.checkOpened();
// Optimized disk
String disk = this.tableDiskMapping.get(tableType);
if (disk != null) {
return this.db(disk);
}
return this.sessions;
}
private RocksDBSessions db(String disk) {
RocksDBSessions db = this.dbs.get(disk);
E.checkState(db != null && !db.closed(),
"RocksDB store has not been opened: %s", disk);
return db;
}
private static boolean existsOtherKeyspace(String dataPath) {
Set<String> cfs;
try {
cfs = RocksDBStdSessions.listCFs(dataPath);
} catch (RocksDBException e) {
return false;
}
int matched = 0;
for (String cf : cfs) {
if (cf.endsWith(RocksDBTables.PropertyKey.TABLE) ||
cf.endsWith(RocksDBTables.VertexLabel.TABLE) ||
cf.endsWith(RocksDBTables.EdgeLabel.TABLE) ||
cf.endsWith(RocksDBTables.IndexLabel.TABLE) ||
cf.endsWith(RocksDBTables.SecondaryIndex.TABLE) ||
cf.endsWith(RocksDBTables.SearchIndex.TABLE) ||
cf.endsWith(RocksDBTables.RangeIntIndex.TABLE) ||
cf.endsWith(RocksDBTables.RangeFloatIndex.TABLE) ||
cf.endsWith(RocksDBTables.RangeLongIndex.TABLE) ||
cf.endsWith(RocksDBTables.RangeDoubleIndex.TABLE)) {
if (++matched >= 3) {
return true;
}
}
}
return false;
}
/***************************** Store defines *****************************/
public static class RocksDBSchemaStore extends RocksDBStore {
private final RocksDBTables.Counters counters;
public RocksDBSchemaStore(BackendStoreProvider provider,
String database, String store) {
super(provider, database, store);
this.counters = new RocksDBTables.Counters(database);
registerTableManager(HugeType.VERTEX_LABEL,
new RocksDBTables.VertexLabel(database));
registerTableManager(HugeType.EDGE_LABEL,
new RocksDBTables.EdgeLabel(database));
registerTableManager(HugeType.PROPERTY_KEY,
new RocksDBTables.PropertyKey(database));
registerTableManager(HugeType.INDEX_LABEL,
new RocksDBTables.IndexLabel(database));
registerTableManager(HugeType.SECONDARY_INDEX,
new RocksDBTables.SecondaryIndex(database));
}
@Override
protected List<String> tableNames() {
List<String> tableNames = super.tableNames();
tableNames.add(this.counters.table());
return tableNames;
}
@Override
public void increaseCounter(HugeType type, long increment) {
Lock readLock = super.storeLock.readLock();
readLock.lock();
try {
super.checkOpened();
RocksDBSessions.Session session = super.sessions.session();
this.counters.increaseCounter(session, type, increment);
} finally {
readLock.unlock();
}
}
@Override
public long getCounter(HugeType type) {
Lock readLock = super.storeLock.readLock();
readLock.lock();
try {
super.checkOpened();
RocksDBSessions.Session session = super.sessions.session();
return this.counters.getCounter(session, type);
} finally {
readLock.unlock();
}
}
@Override
public boolean isSchemaStore() {
return true;
}
}
public static class RocksDBGraphStore extends RocksDBStore {
public RocksDBGraphStore(BackendStoreProvider provider,
String database, String store) {
super(provider, database, store);
registerTableManager(HugeType.VERTEX,
new RocksDBTables.Vertex(database));
registerTableManager(HugeType.EDGE_OUT,
RocksDBTables.Edge.out(database));
registerTableManager(HugeType.EDGE_IN,
RocksDBTables.Edge.in(database));
registerTableManager(HugeType.SECONDARY_INDEX,
new RocksDBTables.SecondaryIndex(database));
registerTableManager(HugeType.VERTEX_LABEL_INDEX,
new RocksDBTables.VertexLabelIndex(database));
registerTableManager(HugeType.EDGE_LABEL_INDEX,
new RocksDBTables.EdgeLabelIndex(database));
registerTableManager(HugeType.RANGE_INT_INDEX,
new RocksDBTables.RangeIntIndex(database));
registerTableManager(HugeType.RANGE_FLOAT_INDEX,
new RocksDBTables.RangeFloatIndex(database));
registerTableManager(HugeType.RANGE_LONG_INDEX,
new RocksDBTables.RangeLongIndex(database));
registerTableManager(HugeType.RANGE_DOUBLE_INDEX,
new RocksDBTables.RangeDoubleIndex(database));
registerTableManager(HugeType.SEARCH_INDEX,
new RocksDBTables.SearchIndex(database));
registerTableManager(HugeType.SHARD_INDEX,
new RocksDBTables.ShardIndex(database));
registerTableManager(HugeType.UNIQUE_INDEX,
new RocksDBTables.UniqueIndex(database));
registerTableManager(this.olapTableName(HugeType.SECONDARY_INDEX),
new RocksDBTables.OlapSecondaryIndex(store));
registerTableManager(this.olapTableName(HugeType.RANGE_INT_INDEX),
new RocksDBTables.OlapRangeIntIndex(store));
registerTableManager(this.olapTableName(HugeType.RANGE_LONG_INDEX),
new RocksDBTables.OlapRangeLongIndex(store));
registerTableManager(this.olapTableName(HugeType.RANGE_FLOAT_INDEX),
new RocksDBTables.OlapRangeFloatIndex(store));
registerTableManager(this.olapTableName(HugeType.RANGE_DOUBLE_INDEX),
new RocksDBTables.OlapRangeDoubleIndex(store));
}
@Override
public boolean isSchemaStore() {
return false;
}
@Override
public Id nextId(HugeType type) {
throw new UnsupportedOperationException(
"RocksDBGraphStore.nextId()");
}
@Override
public void increaseCounter(HugeType type, long num) {
throw new UnsupportedOperationException(
"RocksDBGraphStore.increaseCounter()");
}
@Override
public long getCounter(HugeType type) {
throw new UnsupportedOperationException(
"RocksDBGraphStore.getCounter()");
}
/**
* TODO: can we remove this method since createOlapTable would register?
*/
@Override
public void checkAndRegisterOlapTable(Id id) {
RocksDBTable table = new RocksDBTables.OlapTable(this.store(), id);
if (!super.sessions.existsTable(table.table())) {
throw new HugeException("Not exist table '%s''", table.table());
}
registerTableManager(this.olapTableName(id), table);
}
@Override
public void createOlapTable(Id id) {
RocksDBTable table = new RocksDBTables.OlapTable(this.store(), id);
this.createTable(this.db(HugeType.OLAP), table.table());
registerTableManager(this.olapTableName(id), table);
}
@Override
public void clearOlapTable(Id id) {
String name = this.olapTableName(id);
RocksDBTable table = this.table(name);
RocksDBSessions db = this.db(HugeType.OLAP);
if (!db.existsTable(table.table())) {
throw new HugeException("Not exist table '%s''", name);
}
this.dropTable(db, table.table());
this.createTable(db, table.table());
}
@Override
public void removeOlapTable(Id id) {
String name = this.olapTableName(id);
RocksDBTable table = this.table(name);
RocksDBSessions db = this.db(HugeType.OLAP);
if (!db.existsTable(table.table())) {
throw new HugeException("Not exist table '%s''", name);
}
this.dropTable(db, table.table());
this.unregisterTableManager(this.olapTableName(id));
}
}
public static class RocksDBSystemStore extends RocksDBGraphStore {
private final RocksDBTables.Meta meta;
public RocksDBSystemStore(BackendStoreProvider provider,
String database, String store) {
super(provider, database, store);
this.meta = new RocksDBTables.Meta(database);
}
@Override
public synchronized void init() {
super.init();
Lock writeLock = this.storeLock().writeLock();
writeLock.lock();
try {
RocksDBSessions.Session session = super.session(HugeType.META);
String driverVersion = this.provider().driverVersion();
this.meta.writeVersion(session, driverVersion);
LOG.info("Write down the backend version: {}", driverVersion);
} finally {
writeLock.unlock();
}
}
@Override
public String storedVersion() {
Lock readLock = this.storeLock().readLock();
readLock.lock();
try {
super.checkOpened();
RocksDBSessions.Session session = super.session(null);
return this.meta.readVersion(session);
} finally {
readLock.unlock();
}
}
@Override
protected List<String> tableNames() {
List<String> tableNames = super.tableNames();
tableNames.add(this.meta.table());
return tableNames;
}
}
}