blob: ef04230a31f95a31a03b670ee777f12dd4763b1a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RocksDbPersistableKeyValueStoreService.h"
#include "utils/StringUtils.h"
#include <fstream>
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace controllers {
core::Property RocksDbPersistableKeyValueStoreService::Directory(
core::PropertyBuilder::createProperty("Directory")->withDescription("Path to a directory for the database")
->isRequired(true)->build());
RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, const std::string& id)
: controllers::KeyValueStoreService(name, id)
, controllers::AbstractAutoPersistingKeyValueStoreService(name, id)
, logger_(logging::LoggerFactory<RocksDbPersistableKeyValueStoreService>::getLogger()) {
}
RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
: controllers::KeyValueStoreService(name, uuid)
, controllers::AbstractAutoPersistingKeyValueStoreService(name, uuid)
, logger_(logging::LoggerFactory<RocksDbPersistableKeyValueStoreService>::getLogger()) {
}
void RocksDbPersistableKeyValueStoreService::initialize() {
AbstractAutoPersistingKeyValueStoreService::initialize();
std::set<core::Property> supportedProperties;
supportedProperties.insert(Directory);
updateSupportedProperties(supportedProperties);
}
void RocksDbPersistableKeyValueStoreService::onEnable() {
if (configuration_ == nullptr) {
logger_->log_debug("Cannot enable RocksDbPersistableKeyValueStoreService");
return;
}
AbstractAutoPersistingKeyValueStoreService::onEnable();
if (!getProperty(Directory.getName(), directory_)) {
logger_->log_error("Invalid or missing property: Directory");
return;
}
db_.reset();
rocksdb::Options options;
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
options.use_direct_reads = true;
// Use the same buffer settings as the FlowFileRepository
options.write_buffer_size = 8 << 20;
options.max_write_buffer_number = 20;
options.min_write_buffer_number_to_merge = 1;
if (!always_persist_) {
options.manual_wal_flush = true;
}
rocksdb::DB* db = nullptr;
rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
if (status.ok()) {
db_.reset(db);
logger_->log_trace("Successfully opened RocksDB database at %s", directory_.c_str());
} else {
logger_->log_error("Failed to open RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
return;
}
if (always_persist_) {
default_write_options.sync = true;
}
logger_->log_trace("Enabled RocksDbPersistableKeyValueStoreService");
}
void RocksDbPersistableKeyValueStoreService::notifyStop() {
AbstractAutoPersistingKeyValueStoreService::notifyStop();
db_.reset();
}
bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) {
if (!db_) {
return false;
}
rocksdb::Status status = db_->Put(default_write_options, key, value);
if (!status.ok()) {
logger_->log_error("Failed to Put key %s to RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
return false;
}
return true;
}
bool RocksDbPersistableKeyValueStoreService::get(const std::string& key, std::string& value) {
if (!db_) {
return false;
}
rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), key, &value);
if (!status.ok()) {
logger_->log_error("Failed to Get key %s from RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
return false;
}
return true;
}
bool RocksDbPersistableKeyValueStoreService::get(std::unordered_map<std::string, std::string>& kvs) {
if (!db_) {
return false;
}
kvs.clear();
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
kvs.emplace(it->key().ToString(), it->value().ToString());
}
if (!it->status().ok()) {
logger_->log_error("Encountered error when iterating through RocksDB database at %s, error: %s", directory_.c_str(), it->status().getState());
return false;
}
return true;
}
bool RocksDbPersistableKeyValueStoreService::remove(const std::string& key) {
if (!db_) {
return false;
}
rocksdb::Status status = db_->Delete(default_write_options, key);
if (!status.ok()) {
logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
return false;
}
return true;
}
bool RocksDbPersistableKeyValueStoreService::clear() {
if (!db_) {
return false;
}
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
rocksdb::Status status = db_->Delete(default_write_options, it->key());
if (!status.ok()) {
logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
return false;
}
}
if (!it->status().ok()) {
logger_->log_error("Encountered error when iterating through RocksDB database at %s, error: %s", directory_.c_str(), it->status().getState());
return false;
}
return true;
}
bool RocksDbPersistableKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
if (!db_) {
return false;
}
throw std::logic_error("Unsupported method");
}
bool RocksDbPersistableKeyValueStoreService::persist() {
if (!db_) {
return false;
}
if (always_persist_) {
return true;
}
return db_->FlushWAL(true /*sync*/).ok();
}
} /* namespace controllers */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */