blob: 1ab89f83cdbd56454daac1b8cb20e4e16a2ff784 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RocksDatabase.h"
#include <unordered_map>
#include <utility>
#include "core/logging/LoggerFactory.h"
#include "utils/StringUtils.h"
#include "RocksDbInstance.h"
namespace org::apache::nifi::minifi::internal {
std::unique_ptr<RocksDatabase> RocksDatabase::create(const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch, const std::string& uri,
const std::unordered_map<std::string, std::string>& db_config_override, RocksDbMode mode) {
const std::string scheme = "minifidb://";
auto logger = core::logging::LoggerFactory<RocksDatabase>::getLogger();
logger->log_info("Acquiring database handle '{}'", uri);
std::string db_path = uri;
std::string db_column = "default";
if (utils::string::startsWith(uri, scheme)) {
const std::string path = uri.substr(scheme.length());
logger->log_info("RocksDB scheme is detected in '{}'", uri);
// last segment is treated as the column name
std::string::size_type pos = path.find_last_of('/');
if (pos == std::string::npos) {
pos = path.find_last_of('\\');
}
if (pos == std::string::npos) {
logger->log_error("Couldn't detect the column name in '{}'", uri);
return nullptr;
}
db_path = path.substr(0, pos);
db_column = path.substr(pos + 1);
logger->log_info("Using column '{}' in rocksdb database '{}'", db_column, db_path);
} else {
logger->log_info("Simple directory detected '{}', using as is", uri);
}
if (mode == RocksDbMode::ReadOnly) {
// no need to cache anything with read-only databases
return std::make_unique<RocksDatabase>(std::make_shared<RocksDbInstance>(db_path, mode), db_column, db_options_patch, cf_options_patch, db_config_override);
}
static std::mutex mtx;
static std::unordered_map<std::string, std::weak_ptr<RocksDbInstance>> databases;
std::shared_ptr<RocksDbInstance> instance;
{
std::lock_guard<std::mutex> guard(mtx);
instance = databases[db_path].lock();
if (!instance) {
logger->log_info("Opening rocksdb database '{}'", db_path);
instance = std::make_shared<RocksDbInstance>(db_path, mode);
databases[db_path] = instance;
} else {
logger->log_info("Using previously opened rocksdb instance '{}'", db_path);
}
}
return std::make_unique<RocksDatabase>(instance, db_column, db_options_patch, cf_options_patch, db_config_override);
}
RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch,
const std::unordered_map<std::string, std::string>& db_config_override)
: column_(std::move(column)), db_(std::move(db)) {
db_->registerColumnConfig(column_, db_options_patch, cf_options_patch, db_config_override);
}
RocksDatabase::~RocksDatabase() {
db_->unregisterColumnConfig(column_);
}
std::optional<OpenRocksDb> RocksDatabase::open() {
return db_->open(column_);
}
} // namespace org::apache::nifi::minifi::internal