blob: 26956e48090dad6e62092adb1ff669556fdeb54a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RocksDbStateStorage.h"
#include <cinttypes>
#include <fstream>
#include <utility>
#include "../encryption/RocksDbEncryptionProvider.h"
#include "core/Resource.h"
#include "utils/Locations.h"
#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::controllers {
RocksDbStateStorage::RocksDbStateStorage(const std::string& name, const utils::Identifier& uuid /*= utils::Identifier()*/)
: KeyValueStateStorage(name, uuid) {
}
RocksDbStateStorage::~RocksDbStateStorage() {
auto_persistor_.stop();
}
void RocksDbStateStorage::initialize() {
ControllerServiceImpl::initialize();
setSupportedProperties(Properties);
}
void RocksDbStateStorage::onEnable() {
if (configuration_ == nullptr) {
logger_->log_debug("Cannot enable RocksDbStateStorage");
return;
}
const auto always_persist = getProperty(AlwaysPersist.name)
| utils::andThen(parsing::parseBool)
| utils::orThrow("RocksDbStateStorage::AlwaysPersist is a required Property");
logger_->log_info("Always Persist property: {}", always_persist);
const auto auto_persistence_interval = getProperty(AutoPersistenceInterval.name)
| utils::andThen(parsing::parseDuration<std::chrono::milliseconds>)
| utils::orThrow("RocksDbStateStorage::AutoPersistenceInterval is a required Property");
logger_->log_info("Auto Persistence Interval property: {}", auto_persistence_interval);
directory_ = getProperty(Directory.name) | utils::orThrow("RocksDbStateStorage::Directory is required property");
auto_persistor_.start(always_persist, auto_persistence_interval, [this] { return persistNonVirtual(); });
db_.reset();
const auto working_dir = utils::getMinifiDir();
auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{working_dir}, core::repository::DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
if (!encrypted_env) {
encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{working_dir}, core::repository::DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME_OLD});
}
logger_->log_info("Using {} RocksDbStateStorage", encrypted_env ? "encrypted" : "plaintext");
auto set_db_opts = [encrypted_env] (internal::Writable<rocksdb::DBOptions>& db_opts) {
minifi::internal::setCommonRocksDbOptions(db_opts);
if (encrypted_env) {
db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), core::repository::EncryptionEq{});
} else {
db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
}
};
// Use the same buffer settings as the FlowFileRepository
auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
cf_opts.write_buffer_size = 8ULL << 20U;
cf_opts.min_write_buffer_number_to_merge = 1;
};
db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_,
minifi::internal::getRocksDbOptionsToOverride(configuration_, Configure::nifi_state_storage_rocksdb_options));
if (db_->open()) {
logger_->log_trace("Successfully opened RocksDB database at {}", directory_.c_str());
} else {
// TODO(adebreceni) forward the status
logger_->log_error("Failed to open RocksDB database at {}, error", directory_.c_str());
return;
}
if (auto_persistor_.isAlwaysPersisting()) {
default_write_options.sync = true;
}
verify_checksums_in_rocksdb_reads_ = (configuration_->get(Configure::nifi_rocksdb_state_storage_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);
logger_->log_trace("Enabled RocksDbStateStorage");
}
void RocksDbStateStorage::notifyStop() {
auto_persistor_.stop();
db_.reset();
}
bool RocksDbStateStorage::set(const std::string& key, const std::string& value) {
if (!db_) {
return false;
}
auto opendb = db_->open();
if (!opendb) {
return false;
}
rocksdb::Status status = opendb->Put(default_write_options, key, value);
if (!status.ok()) {
logger_->log_error("Failed to Put key {} to RocksDB database at {}, error: {}", key.c_str(), directory_.c_str(), status.getState());
return false;
}
return true;
}
bool RocksDbStateStorage::get(const std::string& key, std::string& value) {
if (!db_) {
return false;
}
auto opendb = db_->open();
if (!opendb) {
return false;
}
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
rocksdb::Status status = opendb->Get(options, key, &value);
if (!status.ok()) {
if (status.getState() != nullptr) {
logger_->log_error("Failed to Get key {} from RocksDB database at {}, error: {}", key.c_str(), directory_.c_str(), status.getState());
} else {
logger_->log_warn("Failed to Get key {} from RocksDB database at {} (it may not have been initialized yet)", key.c_str(), directory_.c_str());
}
return false;
}
return true;
}
bool RocksDbStateStorage::get(std::unordered_map<std::string, std::string>& kvs) {
if (!db_) {
return false;
}
auto opendb = db_->open();
if (!opendb) {
return false;
}
kvs.clear();
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
kvs.emplace(it->key().ToString(), it->value().ToString());
}
if (!it->status().ok()) {
logger_->log_error("Encountered error when iterating through RocksDB database at {}, error: {}", directory_.c_str(), it->status().getState());
return false;
}
return true;
}
bool RocksDbStateStorage::remove(const std::string& key) {
if (!db_) {
return false;
}
auto opendb = db_->open();
if (!opendb) {
return false;
}
rocksdb::Status status = opendb->Delete(default_write_options, key);
if (!status.ok()) {
logger_->log_error("Failed to Delete from RocksDB database at {}, error: {}", directory_.c_str(), status.getState());
return false;
}
return true;
}
bool RocksDbStateStorage::clear() {
if (!db_) {
return false;
}
auto opendb = db_->open();
if (!opendb) {
return false;
}
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
rocksdb::Status status = opendb->Delete(default_write_options, it->key());
if (!status.ok()) {
logger_->log_error("Failed to Delete from RocksDB database at {}, error: {}", directory_.c_str(), status.getState());
return false;
}
}
if (!it->status().ok()) {
logger_->log_error("Encountered error when iterating through RocksDB database at {}, error: {}", directory_.c_str(), it->status().getState());
return false;
}
return true;
}
bool RocksDbStateStorage::update(const std::string& /*key*/, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& /*update_func*/) {
if (!db_) {
return false;
}
auto opendb = db_->open();
if (!opendb) {
return false;
}
throw std::logic_error("Unsupported method");
}
bool RocksDbStateStorage::persistNonVirtual() {
if (!db_) {
return false;
}
auto opendb = db_->open();
if (!opendb) {
return false;
}
if (auto_persistor_.isAlwaysPersisting()) {
return true;
}
return opendb->FlushWAL(true /*sync*/).ok();
}
REGISTER_RESOURCE_AS(RocksDbStateStorage, ControllerService, ("RocksDbPersistableKeyValueStoreService", "rocksdbpersistablekeyvaluestoreservice", "RocksDbStateStorage"));
} // namespace org::apache::nifi::minifi::controllers