blob: 3d2b7f867a61ac9e2947caa08def48ae67d20ccc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.rocksdbsst;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.rocksdb.EnvOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileWriter;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import org.apache.hugegraph.backend.store.rocksdb.RocksDBIngester;
import org.apache.hugegraph.backend.store.rocksdb.RocksDBSessions;
import org.apache.hugegraph.backend.store.rocksdb.RocksDBStdSessions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.util.E;
public class RocksDBSstSessions extends RocksDBSessions {
private final String dataPath;
private final Map<String, SstFileWriter> tables;
public RocksDBSstSessions(HugeConfig config, String database, String store,
String dataPath) {
super(config, database, store);
this.dataPath = dataPath;
this.tables = new ConcurrentHashMap<>();
File path = new File(this.dataPath);
if (!path.exists()) {
E.checkState(path.mkdirs(), "Can't mkdir '%s'", path);
}
}
public RocksDBSstSessions(HugeConfig config, String dataPath,
String database, String store,
List<String> tableNames) throws RocksDBException {
this(config, dataPath, database, store);
for (String table : tableNames) {
this.createTable(table);
}
}
private RocksDBSstSessions(HugeConfig config, String database, String store,
RocksDBSstSessions origin) {
super(config, database, store);
this.dataPath = origin.dataPath;
this.tables = origin.tables;
}
@Override
public void open() throws Exception {
// pass
}
@Override
protected boolean opened() {
return true;
}
@Override
public Set<String> openedTables() {
return this.tables.keySet();
}
@Override
public synchronized void createTable(String... tables)
throws RocksDBException {
for (String table : tables) {
this.createTable(table);
}
}
private void createTable(String table) throws RocksDBException {
String number = String.format("%04d", 1);
Path sstFile = Paths.get(this.dataPath, table,
number + RocksDBIngester.SST);
try {
FileUtils.forceMkdir(sstFile.toAbsolutePath().getParent().toFile());
} catch (IOException e) {
throw new BackendException("Can't make directory for sst: '%s'",
e, sstFile.toString());
}
EnvOptions env = new EnvOptions();
Options options = new Options();
RocksDBStdSessions.initOptions(this.config(), options, options,
options, options);
// NOTE: unset merge op due to SIGSEGV when cf.setMergeOperatorName()
options.setMergeOperatorName("not-exist-merge-op");
SstFileWriter sst = new SstFileWriter(env, options);
sst.open(sstFile.toString());
this.tables.put(table, sst);
}
@Override
public synchronized void dropTable(String... tables)
throws RocksDBException {
for (String table : tables) {
this.dropTable(table);
}
}
public void dropTable(String table) throws RocksDBException {
SstFileWriter sst = this.tables.remove(table);
assert sst == null || !sst.isOwningHandle() :
"Please close table before drop to ensure call sst.finish()";
}
@Override
public boolean existsTable(String table) {
return this.tables.containsKey(table);
}
@Override
public List<String> property(String property) {
throw new UnsupportedOperationException("RocksDBSstStore property()");
}
@Override
public void compactRange() {
throw new NotSupportException("RocksDBSstStore compactRange()");
}
@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBSstSessions(config, database, store, this);
}
@Override
public void createSnapshot(String snapshotPath) {
throw new UnsupportedOperationException("createSnapshot");
}
@Override
public void resumeSnapshot(String snapshotPath) {
throw new UnsupportedOperationException("resumeSnapshot");
}
@Override
public String buildSnapshotPath(String snapshotPrefix) {
throw new UnsupportedOperationException("buildSnapshotPath");
}
@Override
public String hardLinkSnapshot(String snapshotPath)
throws RocksDBException {
throw new UnsupportedOperationException("hardLinkSnapshot");
}
@Override
public void reloadRocksDB() {
throw new UnsupportedOperationException("reloadRocksDB");
}
@Override
public void forceCloseRocksDB() {
throw new UnsupportedOperationException("forceCloseRocksDB");
}
private SstFileWriter table(String table) {
SstFileWriter sst = this.tables.get(table);
if (sst == null) {
throw new BackendException("Table '%s' is not opened", table);
}
return sst;
}
@Override
public final Session session() {
return (Session) super.getOrNewSession();
}
@Override
protected Session newSession() {
return new SstSession();
}
@Override
protected synchronized void doClose() {
final String NO_ENTRIES = "Can't create sst file with no entries";
for (SstFileWriter sst : this.tables.values()) {
E.checkState(sst.isOwningHandle(), "SstFileWriter closed");
try {
sst.finish();
} catch (RocksDBException e) {
if (e.getMessage().equals(NO_ENTRIES)) {
continue;
}
throw new BackendException("Failed to close SstFileWriter", e);
}
sst.close();
}
this.tables.clear();
}
/**
* SstSession implement for RocksDB
*/
private final class SstSession extends Session {
private final Map<String, Changes> batch;
public SstSession() {
this.batch = new HashMap<>();
}
@Override
public void open() {
this.opened = true;
}
@Override
public void close() {
assert this.closeable();
this.opened = false;
}
/**
* Any change in the session
*/
@Override
public boolean hasChanges() {
return this.batch.size() > 0;
}
/**
* Commit all updates(put/delete) to DB
*/
@Override
public Integer commit() {
int count = this.batch.size();
if (count <= 0) {
return 0;
}
try {
for (Entry<String, Changes> table : this.batch.entrySet()) {
if (table.getValue().isEmpty() ||
table.getKey().endsWith("i")) {
// Skip empty value table or index table
continue;
}
// TODO: limit individual SST file size
SstFileWriter sst = table(table.getKey());
for (Pair<byte[], byte[]> change : table.getValue()) {
sst.put(change.getKey(), change.getValue());
}
}
} catch (RocksDBException e) {
throw new BackendException("Failed to commit", e);
}
// Clear batch if write() successfully (retained if failed)
this.batch.clear();
return count;
}
/**
* Rollback updates not committed in the session
*/
@Override
public void rollback() {
this.batch.clear();
}
@Override
public String dataPath() {
return RocksDBSstSessions.this.dataPath;
}
@Override
public String walPath() {
return RocksDBSstSessions.this.dataPath;
}
/**
* Get property value by name from specified table
*/
@Override
public String property(String table, String property) {
throw new NotSupportException("RocksDBSstStore property()");
}
@Override
public Pair<byte[], byte[]> keyRange(String table) {
return null;
}
@Override
public void compactRange(String table) {
throw new NotSupportException("RocksDBSstStore compactRange()");
}
/**
* Add a KV record to a table
*/
@Override
public void put(String table, byte[] key, byte[] value) {
Changes changes = this.batch.get(table);
if (changes == null) {
changes = new Changes();
this.batch.put(table, changes);
}
changes.add(Pair.of(key, value));
}
/**
* Merge a record to an existing key to a table
* For more details about merge-operator:
* https://github.com/facebook/rocksdb/wiki/merge-operator
*/
@Override
public void merge(String table, byte[] key, byte[] value) {
throw new NotSupportException("RocksDBSstStore merge()");
}
/**
* Merge a record to an existing key to a table and commit immediately
*/
@Override
public void increase(String table, byte[] key, byte[] value) {
throw new NotSupportException("RocksDBSstStore increase()");
}
/**
* Delete a record by key from a table
*/
@Override
public void delete(String table, byte[] key) {
throw new NotSupportException("RocksDBSstStore delete()");
}
/**
* Delete the only one version of a record by key from a table
* NOTE: requires that the key exists and was not overwritten.
*/
@Override
public void deleteSingle(String table, byte[] key) {
throw new NotSupportException("RocksDBSstStore deleteSingle()");
}
/**
* Delete a record by key(or prefix with key) from a table
*/
@Override
public void deletePrefix(String table, byte[] key) {
throw new NotSupportException("RocksDBSstStore deletePrefix()");
}
/**
* Delete a range of keys from a table
*/
@Override
public void deleteRange(String table, byte[] keyFrom, byte[] keyTo) {
throw new NotSupportException("RocksDBSstStore deleteRange()");
}
/**
* Get a record by key from a table
*/
@Override
public byte[] get(String table, byte[] key) {
return null;
}
/**
* Get records by a list of keys from a table
*/
@Override
public BackendColumnIterator get(String table, List<byte[]> keys) {
assert !this.hasChanges();
return BackendColumnIterator.empty();
}
/**
* Scan all records from a table
*/
@Override
public BackendColumnIterator scan(String table) {
assert !this.hasChanges();
return BackendColumnIterator.empty();
}
/**
* Scan records by key prefix from a table
*/
@Override
public BackendColumnIterator scan(String table, byte[] prefix) {
assert !this.hasChanges();
return BackendColumnIterator.empty();
}
/**
* Scan records by key range from a table
*/
@Override
public BackendColumnIterator scan(String table,
byte[] keyFrom,
byte[] keyTo,
int scanType) {
assert !this.hasChanges();
return BackendColumnIterator.empty();
}
}
private static class Changes extends ArrayList<Pair<byte[], byte[]>> {
private static final long serialVersionUID = 9047034706183029125L;
}
}