blob: 759acc67588bbcfb2e979fd6615e1a525142a099 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define KEY_LENGTH 16
#include <chrono>
#include <glog/logging.h>
#include <leveldb/filter_policy.h>
#include <leveldb/write_batch.h>
#include <google/protobuf/wrappers.pb.h>
#include <thread>
#include "leveldb_persistence.h"
#include "model/rdf_operators.h"
#include "util/murmur3.h"
#include "util/unique.h"
#define CHECK_STATUS(s) CHECK(s.ok()) << "Writing to database failed: " << s.ToString()
using leveldb::WriteBatch;
using leveldb::Slice;
using marmotta::rdf::proto::Statement;
using marmotta::rdf::proto::Namespace;
using marmotta::rdf::proto::Resource;
namespace marmotta {
namespace persistence {
namespace {
// Creates an index key based on hashing values of the 4 messages in proper order.
inline void computeKey(const std::string* a, const std::string* b, const std::string* c, const std::string* d, char* result) {
// 128bit keys, use murmur
int offset = 0;
for (auto m : {a, b, c, d}) {
if (m != nullptr) {
#ifdef __x86_64__
MurmurHash3_x64_128(m->data(), m->size(), 13, &result[offset]);
#else
MurmurHash3_x86_128(m->data(), m->size(), 13, &result[offset]);
#endif
} else {
return;
}
offset += KEY_LENGTH;
}
}
enum Position {
S = 0, P = 1, O = 2, C = 3
};
// Reorder a hash key from the generated SPOC key without requiring to recompute murmur.
inline void orderKey(char* dest, const char* src, Position a, Position b, Position c, Position d) {
int offset = 0;
for (int m : {a, b, c, d}) {
memcpy(&dest[offset], &src[m * KEY_LENGTH], KEY_LENGTH * sizeof(char));
offset += KEY_LENGTH;
}
}
/**
* Helper class to define proper cache keys and identify the index to use based on
* fields available in the pattern.
*/
class PatternQuery {
public:
enum IndexType {
SPOC, CSPO, OPSC, PCOS
};
PatternQuery(const Statement& pattern) : pattern(pattern), needsFilter(true) {
if (pattern.has_subject()) {
s.reset(new std::string());
pattern.subject().SerializeToString(s.get());
}
if (pattern.has_predicate()) {
p.reset(new std::string());
pattern.predicate().SerializeToString(p.get());
}
if (pattern.has_object()) {
o.reset(new std::string());
pattern.object().SerializeToString(o.get());
}
if (pattern.has_context()) {
c.reset(new std::string());
pattern.context().SerializeToString(c.get());
}
if (pattern.has_subject()) {
// Subject is usually most selective, so if it is present use the
// subject-based databases first.
if (pattern.has_context()) {
type_ = CSPO;
} else {
type_ = SPOC;
}
// Filter needed if there is no predicate but an object.
needsFilter = !(pattern.has_predicate()) && pattern.has_object();
} else if (pattern.has_object()) {
// Second-best option is object.
type_ = OPSC;
// Filter needed if there is a context (subject already checked, predicate irrelevant).
needsFilter = pattern.has_context();
} else if (pattern.has_predicate()) {
// Predicate is usually least selective.
type_ = PCOS;
// No filter needed, object and subject are not set.
needsFilter = false;
} else if (pattern.has_context()) {
type_ = CSPO;
// No filter needed, subject, predicate object are not set.
needsFilter = false;
} else {
// Fall back to SPOC.
type_ = SPOC;
// No filter needed, we just scan from the beginning.
needsFilter = false;
}
}
/**
* Return the lower key for querying the index (range [MinKey,MaxKey) ).
*/
char* MinKey() const {
char* result = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
compute(result);
return result;
}
/**
* Return the upper key for querying the index (range [MinKey,MaxKey) ).
*/
char* MaxKey() const {
char* result = (char*)malloc(4 * KEY_LENGTH * sizeof(char));
for (int i=0; i < 4 * KEY_LENGTH; i++) {
result[i] = (char)0xFF;
}
compute(result);
return result;
}
IndexType Type() const {
return type_;
}
PatternQuery& Type(IndexType t) {
type_ = t;
return *this;
}
// Returns true in case this query pattern cannot be answered by the index alone.
bool NeedsFilter() const {
return needsFilter;
}
private:
const Statement& pattern;
std::unique_ptr<std::string> s, p, o, c;
// Creates a cache key based on hashing values of the 4 messages in proper order.
void compute(char* result) const {
switch(Type()) {
case SPOC:
computeKey(s.get(), p.get(), o.get(), c.get(), result);
break;
case CSPO:
computeKey(c.get(), s.get(), p.get(), o.get(), result);
break;
case OPSC:
computeKey(o.get(), p.get(), s.get(), c.get(), result);
break;
case PCOS:
computeKey(p.get(), c.get(), o.get(), s.get(), result);
break;
}
}
IndexType type_;
bool needsFilter = true;
};
// Base tterator for wrapping a LevelDB iterators.
template<typename T>
class LevelDBIterator : public util::CloseableIterator<T> {
public:
LevelDBIterator(leveldb::Iterator *it)
: it(it) {
it->SeekToFirst();
}
virtual ~LevelDBIterator() override {
delete it;
};
const T& next() override {
// Parse current position, then iterate to next position for next call.
proto.ParseFromString(it->value().ToString());
it->Next();
return proto;
};
const T& current() const override {
return proto;
};
virtual bool hasNext() override {
return it->Valid();
}
protected:
leveldb::Iterator* it;
T proto;
};
// Iterator wrapping a LevelDB Statement iterator over a given key range.
class StatementRangeIterator : public LevelDBIterator<Statement> {
public:
StatementRangeIterator(leveldb::Iterator *it, char *loKey, char *hiKey)
: LevelDBIterator(it), loKey(loKey), hiKey(hiKey) {
it->Seek(leveldb::Slice(loKey, 4 * KEY_LENGTH));
}
~StatementRangeIterator() override {
free(loKey);
free(hiKey);
};
bool hasNext() override {
return it->Valid() && it->key().compare(leveldb::Slice(hiKey, 4 * KEY_LENGTH)) <= 0;
}
private:
char *loKey;
char *hiKey;
};
// Return true if the statement matches the pattern. Wildcards (empty fields)
// in the pattern are ignored.
bool Matches(const Statement& pattern, const Statement& stmt) {
// equality operators defined in rdf_model.h
if (pattern.has_context() && stmt.context() != pattern.context()) {
return false;
}
if (pattern.has_subject() && stmt.subject() != pattern.subject()) {
return false;
}
if (pattern.has_predicate() && stmt.predicate() != pattern.predicate()) {
return false;
}
return !(pattern.has_object() && stmt.object() != pattern.object());
}
} // namespace
/**
* Build database with default options.
*/
leveldb::DB* buildDB(const std::string& path, const std::string& suffix, const leveldb::Options& options) {
leveldb::DB* db;
leveldb::Status status = leveldb::DB::Open(options, path + "/" + suffix + ".db", &db);
assert(status.ok());
return db;
}
leveldb::Options* buildOptions(KeyComparator* cmp, leveldb::Cache* cache) {
leveldb::Options *options = new leveldb::Options();
options->create_if_missing = true;
// Custom comparator for our keys.
options->comparator = cmp;
// Cache reads in memory.
options->block_cache = cache;
// Write buffer size 16MB (fast bulk imports)
options->write_buffer_size = 16384 * 1024;
// Set a bloom filter of 10 bits.
options->filter_policy = leveldb::NewBloomFilterPolicy(10);
return options;
}
leveldb::Options buildNsOptions() {
leveldb::Options options;
options.create_if_missing = true;
return options;
}
LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize)
: comparator(new KeyComparator())
, cache(leveldb::NewLRUCache(cacheSize))
, options(buildOptions(comparator.get(), cache.get()))
, db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions()))
, db_ns_url(buildDB(path, "ns_url", buildNsOptions()))
, db_meta(buildDB(path, "metadata", buildNsOptions())) {
// Open databases in separate threads as LevelDB does a lot of computation on open.
std::vector<std::thread> openers;
openers.push_back(std::thread([&]() {
db_spoc.reset(buildDB(path, "spoc", *options));
}));
openers.push_back(std::thread([&]() {
db_cspo.reset(buildDB(path, "cspo", *options));
}));
openers.push_back(std::thread([&]() {
db_opsc.reset(buildDB(path, "opsc", *options));
}));
openers.push_back(std::thread([&]() {
db_pcos.reset(buildDB(path, "pcos", *options));
}));
for (auto& t : openers) {
t.join();
}
CHECK_NOTNULL(db_spoc.get());
CHECK_NOTNULL(db_cspo.get());
CHECK_NOTNULL(db_opsc.get());
CHECK_NOTNULL(db_pcos.get());
LOG(INFO) << "LevelDB Database initialised.";
}
int64_t LevelDBPersistence::AddNamespaces(NamespaceIterator& it) {
DLOG(INFO) << "Starting batch namespace import operation.";
int64_t count = 0;
leveldb::WriteBatch batch_prefix, batch_url;
while (it.hasNext()) {
AddNamespace(it.next(), batch_prefix, batch_url);
count++;
}
CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &batch_prefix));
CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &batch_url));
DLOG(INFO) << "Imported " << count << " namespaces";
return count;
}
std::unique_ptr<LevelDBPersistence::NamespaceIterator> LevelDBPersistence::GetNamespaces(
const rdf::proto::Namespace &pattern) {
DLOG(INFO) << "Get namespaces matching pattern " << pattern.DebugString();
Namespace ns;
leveldb::DB *db = nullptr;
std::string key, value;
if (pattern.prefix() != "") {
key = pattern.prefix();
db = db_ns_prefix.get();
} else if(pattern.uri() != "") {
key = pattern.uri();
db = db_ns_url.get();
}
if (db != nullptr) {
// Either prefix or uri given, report the correct namespace value.
leveldb::Status s = db->Get(leveldb::ReadOptions(), key, &value);
if (s.ok()) {
ns.ParseFromString(value);
return util::make_unique<util::SingletonIterator<Namespace>>(std::move(ns));
} else {
return util::make_unique<util::EmptyIterator<Namespace>>();
}
} else {
// Pattern was empty, iterate over all namespaces and report them.
return util::make_unique<LevelDBIterator<Namespace>>(
db_ns_prefix->NewIterator(leveldb::ReadOptions()));
}
}
void LevelDBPersistence::GetNamespaces(
const Namespace &pattern, LevelDBPersistence::NamespaceHandler callback) {
int64_t count = 0;
bool cbsuccess = true;
for(auto it = GetNamespaces(pattern); cbsuccess && it->hasNext();) {
cbsuccess = callback(it->next());
count++;
}
DLOG(INFO) << "Get namespaces done (count=" << count <<")";
}
int64_t LevelDBPersistence::AddStatements(StatementIterator& it) {
auto start = std::chrono::steady_clock::now();
LOG(INFO) << "Starting batch statement import operation.";
int64_t count = 0;
leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
while (it.hasNext()) {
AddStatement(it.next(), batch_spoc, batch_cspo, batch_opsc, batch_pcos);
count++;
}
std::vector<std::thread> writers;
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
}));
for (auto& t : writers) {
t.join();
}
LOG(INFO) << "Imported " << count << " statements (time="
<< std::chrono::duration <double, std::milli> (
std::chrono::steady_clock::now() - start).count()
<< "ms).";
return count;
}
std::unique_ptr<LevelDBPersistence::StatementIterator> LevelDBPersistence::GetStatements(
const rdf::proto::Statement &pattern) {
DLOG(INFO) << "Get statements matching pattern " << pattern.DebugString();
PatternQuery query(pattern);
leveldb::DB* db;
switch (query.Type()) {
case PatternQuery::SPOC:
db = db_spoc.get();
DLOG(INFO) << "Query: Using index type SPOC";
break;
case PatternQuery::CSPO:
db = db_cspo.get();
DLOG(INFO) << "Query: Using index type CSPO";
break;
case PatternQuery::OPSC:
db = db_opsc.get();
DLOG(INFO) << "Query: Using index type OPSC";
break;
case PatternQuery::PCOS:
db = db_pcos.get();
DLOG(INFO) << "Query: Using index type PCOS";
break;
};
if (query.NeedsFilter()) {
DLOG(INFO) << "Retrieving statements with filter.";
return util::make_unique<util::FilteringIterator<Statement>>(
new StatementRangeIterator(
db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey()),
[&pattern](const Statement& stmt) -> bool { return Matches(pattern, stmt); });
} else {
DLOG(INFO) << "Retrieving statements without filter.";
return util::make_unique<StatementRangeIterator>(
db->NewIterator(leveldb::ReadOptions()), query.MinKey(), query.MaxKey());
}
}
void LevelDBPersistence::GetStatements(
const Statement& pattern, std::function<bool(const Statement&)> callback) {
auto start = std::chrono::steady_clock::now();
int64_t count = 0;
bool cbsuccess = true;
for(auto it = GetStatements(pattern); cbsuccess && it->hasNext(); ) {
cbsuccess = callback(it->next());
count++;
}
DLOG(INFO) << "Get statements done (count=" << count << ", time="
<< std::chrono::duration <double, std::milli> (
std::chrono::steady_clock::now() - start).count()
<< "ms).";
}
int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& pattern) {
auto start = std::chrono::steady_clock::now();
DLOG(INFO) << "Remove statements matching pattern " << pattern.DebugString();
int64_t count = 0;
Statement stmt;
leveldb::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos;
count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos);
std::vector<std::thread> writers;
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &batch_pcos));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &batch_opsc));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &batch_cspo));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &batch_spoc));
}));
for (auto& t : writers) {
t.join();
}
DLOG(INFO) << "Removed " << count << " statements (time=" <<
std::chrono::duration <double, std::milli> (
std::chrono::steady_clock::now() - start).count()
<< "ms).";
return count;
}
UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator &it) {
auto start = std::chrono::steady_clock::now();
LOG(INFO) << "Starting batch update operation.";
UpdateStatistics stats;
WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url;
while (it.hasNext()) {
auto next = it.next();
if (next.has_stmt_added()) {
AddStatement(next.stmt_added(), b_spoc, b_cspo, b_opsc, b_pcos);
stats.added_stmts++;
} else if (next.has_stmt_removed()) {
stats.removed_stmts +=
RemoveStatements(next.stmt_removed(), b_spoc, b_cspo, b_opsc, b_pcos);
} else if(next.has_ns_added()) {
AddNamespace(next.ns_added(), b_prefix, b_url);
stats.added_ns++;
} else if(next.has_ns_removed()) {
RemoveNamespace(next.ns_removed(), b_prefix, b_url);
}
}
std::vector<std::thread> writers;
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_pcos->Write(leveldb::WriteOptions(), &b_pcos));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_opsc->Write(leveldb::WriteOptions(), &b_opsc));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_cspo->Write(leveldb::WriteOptions(), &b_cspo));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_spoc->Write(leveldb::WriteOptions(), &b_spoc));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_ns_prefix->Write(leveldb::WriteOptions(), &b_prefix));
}));
writers.push_back(std::thread([&]() {
CHECK_STATUS(db_ns_url->Write(leveldb::WriteOptions(), &b_url));
}));
for (auto& t : writers) {
t.join();
}
LOG(INFO) << "Batch update complete. (statements added: " << stats.added_stmts
<< ", statements removed: " << stats.removed_stmts
<< ", namespaces added: " << stats.added_ns
<< ", namespaces removed: " << stats.removed_ns
<< ", time=" << std::chrono::duration <double, std::milli> (
std::chrono::steady_clock::now() - start).count() << "ms).";
return stats;
}
void LevelDBPersistence::AddNamespace(
const Namespace &ns, WriteBatch &ns_prefix, WriteBatch &ns_url) {
DLOG(INFO) << "Adding namespace " << ns.DebugString();
std::string buffer;
ns.SerializeToString(&buffer);
ns_prefix.Put(ns.prefix(), buffer);
ns_url.Put(ns.uri(), buffer);
}
void LevelDBPersistence::RemoveNamespace(
const Namespace &pattern, WriteBatch &ns_prefix, WriteBatch &ns_url) {
DLOG(INFO) << "Removing namespaces matching pattern " << pattern.DebugString();
GetNamespaces(pattern, [&ns_prefix, &ns_url](const rdf::proto::Namespace& ns) -> bool {
ns_prefix.Delete(ns.prefix());
ns_url.Delete(ns.uri());
return true;
});
}
void LevelDBPersistence::AddStatement(
const Statement &stmt,
WriteBatch &spoc, WriteBatch &cspo, WriteBatch &opsc, WriteBatch &pcos) {
DLOG(INFO) << "Adding statement " << stmt.DebugString();
std::string buffer, bufs, bufp, bufo, bufc;
stmt.SerializeToString(&buffer);
stmt.subject().SerializeToString(&bufs);
stmt.predicate().SerializeToString(&bufp);
stmt.object().SerializeToString(&bufo);
stmt.context().SerializeToString(&bufc);
char *k_spoc = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc);
spoc.Put(leveldb::Slice(k_spoc, 4 * KEY_LENGTH), buffer);
char *k_cspo = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
orderKey(k_cspo, k_spoc, C, S, P, O);
cspo.Put(leveldb::Slice(k_cspo, 4 * KEY_LENGTH), buffer);
char *k_opsc = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
orderKey(k_opsc, k_spoc, O, P, S, C);
opsc.Put(leveldb::Slice(k_opsc, 4 * KEY_LENGTH), buffer);
char *k_pcos = (char *) calloc(4 * KEY_LENGTH, sizeof(char));
orderKey(k_pcos, k_spoc, P, C, O, S);
pcos.Put(leveldb::Slice(k_pcos, 4 * KEY_LENGTH), buffer);
free(k_spoc);
free(k_cspo);
free(k_opsc);
free(k_pcos);
}
int64_t LevelDBPersistence::RemoveStatements(
const Statement& pattern,
WriteBatch& spoc, WriteBatch& cspo, WriteBatch& opsc, WriteBatch&pcos) {
DLOG(INFO) << "Removing statements matching " << pattern.DebugString();
int64_t count = 0;
std::string bufs, bufp, bufo, bufc;
GetStatements(pattern, [&](const Statement stmt) -> bool {
stmt.subject().SerializeToString(&bufs);
stmt.predicate().SerializeToString(&bufp);
stmt.object().SerializeToString(&bufo);
stmt.context().SerializeToString(&bufc);
char* k_spoc = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
computeKey(&bufs, &bufp, &bufo, &bufc, k_spoc);
spoc.Delete(leveldb::Slice(k_spoc, 4 * KEY_LENGTH));
char* k_cspo = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
orderKey(k_cspo, k_spoc, C, S, P, O);
cspo.Delete(leveldb::Slice(k_cspo, 4 * KEY_LENGTH));
char* k_opsc = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
orderKey(k_opsc, k_spoc, O, P, S, C);
opsc.Delete(leveldb::Slice(k_opsc, 4 * KEY_LENGTH));
char* k_pcos = (char*)calloc(4 * KEY_LENGTH, sizeof(char));
orderKey(k_pcos, k_spoc, P, C, O, S);
pcos.Delete(leveldb::Slice(k_pcos, 4 * KEY_LENGTH));
free(k_spoc);
free(k_cspo);
free(k_opsc);
free(k_pcos);
count++;
return true;
});
return count;
}
int KeyComparator::Compare(const leveldb::Slice& a, const leveldb::Slice& b) const {
return memcmp(a.data(), b.data(), 4 * KEY_LENGTH);
}
int64_t LevelDBPersistence::Size() {
int64_t count = 0;
leveldb::Iterator* it = db_cspo->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
count++;
}
delete it;
return count;
}
} // namespace persistence
} // namespace marmotta