| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "FlowFileRepository.h" |
| |
| #include <chrono> |
| #include <list> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "FlowFileRecord.h" |
| #include "core/Resource.h" |
| #include "core/TypedValues.h" |
| #include "rocksdb/options.h" |
| #include "rocksdb/slice.h" |
| #include "utils/Locations.h" |
| #include "utils/OptionalUtils.h" |
| #include "utils/gsl.h" |
| |
| using namespace std::literals::chrono_literals; |
| |
| namespace org::apache::nifi::minifi::core::repository { |
| |
| void FlowFileRepository::flush() { |
| auto opendb = db_->open(); |
| if (!opendb) { |
| return; |
| } |
| auto batch = opendb->createWriteBatch(); |
| |
| std::list<ExpiredFlowFileInfo> flow_files; |
| |
| while (keys_to_delete_.size_approx() > 0) { |
| ExpiredFlowFileInfo info; |
| if (keys_to_delete_.try_dequeue(info)) { |
| flow_files.push_back(std::move(info)); |
| } |
| } |
| |
| deserializeFlowFilesWithNoContentClaim(opendb.value(), flow_files); |
| |
| for (auto& ff : flow_files) { |
| batch.Delete(ff.key); |
| logger_->log_debug("Issuing batch delete, including {}, Content path {}", ff.key, ff.content ? ff.content->getContentFullPath() : "null"); |
| } |
| |
| auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); }; |
| |
| if (!ExecuteWithRetry(operation)) { |
| for (auto&& ff : flow_files) { |
| keys_to_delete_.enqueue(std::move(ff)); |
| } |
| return; // Stop here - don't delete from content repo while we have records in FF repo |
| } |
| |
| if (content_repo_) { |
| for (auto& ff : flow_files) { |
| if (ff.content) { |
| ff.content->decreaseFlowFileRecordOwnedCount(); |
| } |
| } |
| } |
| } |
| |
| void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& opendb, std::list<ExpiredFlowFileInfo>& flow_files) { |
| std::vector<rocksdb::Slice> keys; |
| std::vector<std::list<ExpiredFlowFileInfo>::iterator> key_positions; |
| for (auto it = flow_files.begin(); it != flow_files.end(); ++it) { |
| if (!it->content) { |
| keys.push_back(it->key); |
| key_positions.push_back(it); |
| } |
| } |
| if (keys.empty()) { |
| return; |
| } |
| std::vector<std::string> values; |
| rocksdb::ReadOptions options; |
| options.verify_checksums = verify_checksums_in_rocksdb_reads_; |
| auto multistatus = opendb.MultiGet(options, keys, &values); |
| gsl_Expects(keys.size() == values.size() && values.size() == multistatus.size()); |
| |
| for (size_t i = 0; i < keys.size(); ++i) { |
| if (!multistatus[i].ok()) { |
| logger_->log_error("Failed to read key from rocksdb: {}! DB is most probably in an inconsistent state!", keys[i].data()); |
| flow_files.erase(key_positions.at(i)); |
| continue; |
| } |
| |
| utils::Identifier container_id; |
| auto flow_file = FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const std::byte>(), content_repo_, container_id); |
| if (flow_file) { |
| gsl_Expects(flow_file->getUUIDStr() == key_positions.at(i)->key); |
| key_positions.at(i)->content = flow_file->getResourceClaim(); |
| } else { |
| logger_->log_error("Could not deserialize flow file {}", key_positions.at(i)->key); |
| } |
| } |
| } |
| |
| void FlowFileRepository::run() { |
| while (isRunning()) { |
| std::this_thread::sleep_for(purge_period_); |
| flush(); |
| } |
| flush(); |
| } |
| |
| bool FlowFileRepository::contentSizeIsAmpleForFlowFile(const FlowFile& flow_file_record, const std::shared_ptr<ResourceClaim>& resource_claim) const { |
| const auto stream_size = resource_claim ? content_repo_->size(*resource_claim) : 0; |
| const auto required_size = flow_file_record.getOffset() + flow_file_record.getSize(); |
| return stream_size >= required_size; |
| } |
| |
| Connectable* FlowFileRepository::getContainer(const std::string& container_id) { |
| auto container = containers_.find(container_id); |
| if (container != containers_.end()) |
| return container->second; |
| // for backward compatibility |
| container = connection_map_.find(container_id); |
| if (container != connection_map_.end()) |
| return container->second; |
| return nullptr; |
| } |
| void FlowFileRepository::initialize_repository() { |
| auto opendb = db_->open(); |
| if (!opendb) { |
| logger_->log_trace("Couldn't open database to load existing flow files"); |
| return; |
| } |
| logger_->log_info("Reading existing flow files from database"); |
| |
| rocksdb::ReadOptions options; |
| options.verify_checksums = verify_checksums_in_rocksdb_reads_; |
| const auto it = opendb->NewIterator(options); |
| for (it->SeekToFirst(); it->Valid(); it->Next()) { |
| utils::Identifier container_id; |
| auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>(), content_repo_, container_id); |
| const std::string key = it->key().ToString(); |
| if (!eventRead) { |
| // failed to deserialize FlowFile, cannot clear claim |
| keys_to_delete_.enqueue({.key = key}); |
| continue; |
| } |
| auto claim = eventRead->getResourceClaim(); |
| if (claim) { |
| claim->increaseFlowFileRecordOwnedCount(); |
| } |
| const auto container = getContainer(container_id.to_string()); |
| if (!container) { |
| logger_->log_warn("Could not find connection for {}, path {}", container_id.to_string(), eventRead->getContentFullPath()); |
| keys_to_delete_.enqueue({.key = key, .content = eventRead->getResourceClaim()}); |
| continue; |
| } |
| if (check_flowfile_content_size_ && !contentSizeIsAmpleForFlowFile(*eventRead, claim)) { |
| logger_->log_warn("Content is missing or too small for flowfile {}", eventRead->getContentFullPath()); |
| keys_to_delete_.enqueue({.key = key, .content = eventRead->getResourceClaim()}); |
| continue; |
| } |
| |
| logger_->log_debug("Found connection for {}, path {}", container_id.to_string(), eventRead->getContentFullPath()); |
| eventRead->setStoredToRepository(true); |
| // we found the connection for the persistent flowFile |
| // even if a processor immediately marks it for deletion, flush only happens after prune_stored_flowfiles |
| container->restore(eventRead); |
| } |
| flush(); |
| content_repo_->clearOrphans(); |
| } |
| |
| void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { |
| content_repo_ = content_repo; |
| swap_loader_ = std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_, verify_checksums_in_rocksdb_reads_); |
| |
| initialize_repository(); |
| } |
| |
| namespace { |
| bool getRepositoryCheckHealth(const Configure& configure) { |
| std::string check_health_str; |
| configure.get(Configure::nifi_flow_file_repository_check_health, check_health_str); |
| return utils::string::toBool(check_health_str).value_or(true); |
| } |
| } // namespace |
| |
| bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) { |
| config_ = configure; |
| std::string value; |
| |
| if (configure->get(Configure::nifi_flowfile_repository_directory_default, value) && !value.empty()) { |
| directory_ = value; |
| } |
| check_flowfile_content_size_ = getRepositoryCheckHealth(*configure); |
| logger_->log_debug("NiFi FlowFile Repository Directory {}", directory_); |
| |
| setCompactionPeriod(configure); |
| |
| const auto working_dir = utils::getMinifiDir(); |
| |
| const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{working_dir}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME}); |
| logger_->log_info("Using {} FlowFileRepository", encrypted_env ? "encrypted" : "plaintext"); |
| |
| verify_checksums_in_rocksdb_reads_ = (configure->get(Configure::nifi_flowfile_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false); |
| logger_->log_debug("{} checksum verification in FlowFileRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); |
| |
| auto db_options = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& options) { |
| minifi::internal::setCommonRocksDbOptions(options); |
| if (encrypted_env) { |
| options.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{}); |
| } else { |
| options.set(&rocksdb::DBOptions::env, rocksdb::Env::Default()); |
| } |
| }; |
| |
| // Write buffers are used as db operation logs. When they get filled the events are merged and serialized. |
| // The default size is 64MB. |
| // In our case it's usually too much, causing sawtooth in memory consumption. (Consumes more than the whole MiniFi) |
| // To avoid DB write issues during heavy load it's recommended to have high number of buffer. |
| // Rocksdb's stall feature can also trigger in case the number of buffers is >= 3. |
| // The more buffers we have the more memory rocksdb can utilize without significant memory consumption under low load. |
| auto cf_options = [&configure] (rocksdb::ColumnFamilyOptions& cf_opts) { |
| cf_opts.OptimizeForPointLookup(4); |
| cf_opts.write_buffer_size = 8ULL << 20U; |
| cf_opts.max_write_buffer_number = 20; |
| cf_opts.min_write_buffer_number_to_merge = 1; |
| if (auto compression_type = minifi::internal::readConfiguredCompressionType(configure, Configure::nifi_flow_repository_rocksdb_compression)) { |
| cf_opts.compression = *compression_type; |
| } |
| }; |
| db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_, |
| minifi::internal::getRocksDbOptionsToOverride(configure, Configure::nifi_flowfile_repository_rocksdb_options)); |
| if (db_->open()) { |
| logger_->log_debug("NiFi FlowFile Repository database open {} success", directory_); |
| return true; |
| } else { |
| logger_->log_error("NiFi FlowFile Repository database open {} fail", directory_); |
| return false; |
| } |
| } |
| |
| void FlowFileRepository::setCompactionPeriod(const std::shared_ptr<Configure> &configure) { |
| compaction_period_ = DEFAULT_COMPACTION_PERIOD; |
| if (auto compaction_period_str = configure->get(Configure::nifi_flowfile_repository_rocksdb_compaction_period)) { |
| if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) { |
| compaction_period_ = compaction_period->getMilliseconds(); |
| if (compaction_period_.count() == 0) { |
| logger_->log_warn("Setting '{}' to 0 disables forced compaction", Configure::nifi_flowfile_repository_rocksdb_compaction_period); |
| } |
| } else { |
| logger_->log_error("Malformed property '{}', expected time period, using default", Configure::nifi_flowfile_repository_rocksdb_compaction_period); |
| } |
| } else { |
| logger_->log_debug("Using default compaction period of {}", compaction_period_); |
| } |
| } |
| |
| bool FlowFileRepository::Delete(const std::string& key) { |
| keys_to_delete_.enqueue({.key = key}); |
| return true; |
| } |
| |
| void FlowFileRepository::runCompaction() { |
| do { |
| if (auto opendb = db_->open()) { |
| auto status = opendb->RunCompaction(); |
| logger_->log_trace("Compaction triggered: {}", status.ToString()); |
| } else { |
| logger_->log_error("Failed to open database for compaction"); |
| } |
| } while (!utils::StoppableThread::waitForStopRequest(compaction_period_)); |
| } |
| |
| bool FlowFileRepository::start() { |
| const bool ret = ThreadedRepositoryImpl::start(); |
| if (swap_loader_) { |
| swap_loader_->start(); |
| } |
| if (compaction_period_.count() != 0) { |
| compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () { |
| runCompaction(); |
| }); |
| } |
| return ret; |
| } |
| |
| bool FlowFileRepository::stop() { |
| compaction_thread_.reset(); |
| if (swap_loader_) { |
| swap_loader_->stop(); |
| } |
| return ThreadedRepositoryImpl::stop(); |
| } |
| |
| void FlowFileRepository::store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) { |
| gsl_Expects(ranges::all_of(flow_files, &FlowFile::isStored)); |
| // pass, flowfiles are already persisted in the repository |
| } |
| |
| std::future<std::vector<std::shared_ptr<core::FlowFile>>> FlowFileRepository::load(std::vector<SwappedFlowFile> flow_files) { |
| return swap_loader_->load(std::move(flow_files)); |
| } |
| |
| bool FlowFileRepository::Delete(const std::shared_ptr<core::CoreComponent>& item) { |
| if (auto ff = std::dynamic_pointer_cast<core::FlowFile>(item)) { |
| keys_to_delete_.enqueue({.key = item->getUUIDStr(), .content = ff->getResourceClaim()}); |
| } else { |
| keys_to_delete_.enqueue({.key = item->getUUIDStr()}); |
| } |
| return true; |
| } |
| |
| REGISTER_RESOURCE_AS(FlowFileRepository, InternalResource, ("FlowFileRepository", "flowfilerepository")); |
| |
| } // namespace org::apache::nifi::minifi::core::repository |