blob: 2edb58427f68c087432c4cb96037ae2cb8b9f3ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <utility>
#include <vector>
#include <string>
#include <memory>
#include "utils/file/FileUtils.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/checkpoint.h"
#include "core/Core.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/ThreadedRepository.h"
#include "Connection.h"
#include "concurrentqueue.h"
#include "database/RocksDatabase.h"
#include "encryption/RocksDbEncryptionProvider.h"
#include "utils/crypto/EncryptionProvider.h"
#include "SwapManager.h"
#include "FlowFileLoader.h"
#include "range/v3/algorithm/all_of.hpp"
#include "utils/Literals.h"
namespace org::apache::nifi::minifi::core::repository {
#ifdef WIN32
constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = ".\\flowfile_repository";
constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = ".\\flowfile_checkpoint";
#else
constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = "./flowfile_repository";
constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = "./flowfile_checkpoint";
#endif
constexpr auto MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE = 10_MiB;
constexpr auto MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10);
constexpr auto FLOWFILE_REPOSITORY_PURGE_PERIOD = std::chrono::seconds(2);
constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::milliseconds(500);
/**
* Flow File repository
* Design: Extends Repository and implements the run function, using rocksdb as the primary substrate.
*/
class FlowFileRepository : public ThreadedRepository, public SwapManager {
public:
static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.flowfile.repository.encryption.key";
FlowFileRepository(std::string name, const utils::Identifier& /*uuid*/)
: FlowFileRepository(std::move(name)) {
}
explicit FlowFileRepository(const std::string& repo_name = "",
std::filesystem::path checkpoint_dir = FLOWFILE_CHECKPOINT_DIRECTORY,
std::string directory = FLOWFILE_REPOSITORY_DIRECTORY,
std::chrono::milliseconds maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
std::chrono::milliseconds purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<FlowFileRepository>(), std::move(directory), maxPartitionMillis, maxPartitionBytes, purgePeriod),
checkpoint_dir_(std::move(checkpoint_dir)),
content_repo_(nullptr),
checkpoint_(nullptr),
logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) {
db_ = nullptr;
}
~FlowFileRepository() override {
stop();
}
static auto properties() { return std::array<core::Property, 0>{}; }
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
bool isNoop() const override {
return false;
}
void flush() override;
virtual void printStats();
bool initialize(const std::shared_ptr<Configure> &configure) override {
config_ = configure;
std::string value;
if (configure->get(Configure::nifi_flowfile_repository_directory_default, value)) {
directory_ = value;
}
logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");
auto db_options = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& options) {
options.set(&rocksdb::DBOptions::create_if_missing, true);
options.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true);
options.set(&rocksdb::DBOptions::use_direct_reads, true);
if (encrypted_env) {
options.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{});
} else {
options.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
}
};
// Write buffers are used as db operation logs. When they get filled the events are merged and serialized.
// The default size is 64MB.
// In our case it's usually too much, causing sawtooth in memory consumption. (Consumes more than the whole MiniFi)
// To avoid DB write issues during heavy load it's recommended to have high number of buffer.
// Rocksdb's stall feature can also trigger in case the number of buffers is >= 3.
// The more buffers we have the more memory rocksdb can utilize without significant memory consumption under low load.
auto cf_options = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
cf_opts.OptimizeForPointLookup(4);
cf_opts.write_buffer_size = 8ULL << 20U;
cf_opts.max_write_buffer_number = 20;
cf_opts.min_write_buffer_number_to_merge = 1;
};
db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_);
if (db_->open()) {
logger_->log_debug("NiFi FlowFile Repository database open %s success", directory_);
return true;
} else {
logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_);
return false;
}
}
bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override {
// persistent to the DB
auto opendb = db_->open();
if (!opendb) {
return false;
}
rocksdb::Slice value((const char *) buf, bufLen);
auto operation = [&key, &value, &opendb]() { return opendb->Put(rocksdb::WriteOptions(), key, value); };
return ExecuteWithRetry(operation);
}
bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override {
auto opendb = db_->open();
if (!opendb) {
return false;
}
auto batch = opendb->createWriteBatch();
for (const auto &item : data) {
const auto buf = item.second->getBuffer().as_span<const char>();
rocksdb::Slice value(buf.data(), buf.size());
if (!batch.Put(item.first, value).ok()) {
logger_->log_error("Failed to add item to batch operation");
return false;
}
}
auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); };
return ExecuteWithRetry(operation);
}
/**
* Deletes the key
* @return status of the delete operation
*/
bool Delete(const std::string& key) override {
keys_to_delete.enqueue(key);
return true;
}
/**
* Sets the value from the provided key
* @return status of the get operation.
*/
bool Get(const std::string &key, std::string &value) override {
auto opendb = db_->open();
if (!opendb) {
return false;
}
return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
}
void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override;
bool start() override {
const bool ret = ThreadedRepository::start();
if (swap_loader_) {
swap_loader_->start();
}
return ret;
}
bool stop() override {
if (swap_loader_) {
swap_loader_->stop();
}
return ThreadedRepository::stop();
}
void store([[maybe_unused]] std::vector<std::shared_ptr<core::FlowFile>> flow_files) override {
gsl_Expects(ranges::all_of(flow_files, &FlowFile::isStored));
// pass, flowfiles are already persisted in the repository
}
std::future<std::vector<std::shared_ptr<core::FlowFile>>> load(std::vector<SwappedFlowFile> flow_files) override {
return swap_loader_->load(std::move(flow_files));
}
private:
void run() override;
bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
void initialize_repository();
/**
* Returns true if a checkpoint is needed at startup
* @return true if a checkpoint is needed.
*/
static bool need_checkpoint(minifi::internal::OpenRocksDb& opendb);
void prune_stored_flowfiles();
std::thread& getThread() override {
return thread_;
}
std::filesystem::path checkpoint_dir_;
moodycamel::ConcurrentQueue<std::string> keys_to_delete;
std::shared_ptr<core::ContentRepository> content_repo_;
std::unique_ptr<minifi::internal::RocksDatabase> db_;
std::unique_ptr<rocksdb::Checkpoint> checkpoint_;
std::unique_ptr<FlowFileLoader> swap_loader_;
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<minifi::Configure> config_;
std::thread thread_;
};
} // namespace org::apache::nifi::minifi::core::repository