blob: 0d055ead5afc4c5803ff4a3e0e512e71797a1e63 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FlowFileRepository.h"
#include "rocksdb/write_batch.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "FlowFileRecord.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
namespace repository {
void FlowFileRepository::flush() {
rocksdb::WriteBatch batch;
std::string key;
std::string value;
rocksdb::ReadOptions options;
std::vector<std::shared_ptr<FlowFileRecord>> purgeList;
uint64_t decrement_total = 0;
while (keys_to_delete.size_approx() > 0) {
if (keys_to_delete.try_dequeue(key)) {
db_->Get(options, key, &value);
decrement_total += value.size();
std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) {
purgeList.push_back(eventRead);
}
logger_->log_debug("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath());
batch.Delete(key);
}
}
if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) {
logger_->log_trace("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load());
if (decrement_total > repo_size_.load()) {
repo_size_ = 0;
} else {
repo_size_ -= decrement_total;
}
}
if (nullptr != content_repo_) {
for (const auto &ffr : purgeList) {
auto claim = ffr->getResourceClaim();
if (claim != nullptr) {
content_repo_->removeIfOrphaned(claim);
}
}
}
}
void FlowFileRepository::run() {
// threshold for purge
while (running_) {
std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
flush();
uint64_t size = getRepoSize();
if (size > (uint64_t)max_partition_bytes_)
repo_full_ = true;
else
repo_full_ = false;
}
}
void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
content_repo_ = content_repo;
std::vector<std::pair<std::string, uint64_t>> purgeList;
rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
repo_size_ = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
std::string key = it->key().ToString();
repo_size_ += it->value().size();
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
logger_->log_debug("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
auto search = connectionMap.find(eventRead->getConnectionUuid());
if (search != connectionMap.end()) {
// we find the connection for the persistent flowfile, create the flowfile and enqueue that
std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
eventRead->setStoredToRepository(true);
search->second->put(eventRead);
} else {
logger_->log_warn("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
if (eventRead->getContentFullPath().length() > 0) {
if (nullptr != eventRead->getResourceClaim()) {
content_repo_->remove(eventRead->getResourceClaim());
}
}
purgeList.push_back(std::make_pair(key, it->value().size()));
}
} else {
purgeList.push_back(std::make_pair(key, it->value().size()));
}
}
delete it;
for (auto eventId : purgeList) {
logger_->log_debug("Repository Repo %s Purge %s", name_, eventId.first);
if (Delete(eventId.first)) {
repo_size_ -= eventId.second;
}
}
return;
}
} /* namespace repository */
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */