blob: 8bbd81b48719335b0cabe9125d5b1e5bc696fb92 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/repository/VolatileRepository.h"
#include <map>
#include <memory>
#include <limits>
#include <string>
#include <vector>
#include "FlowFileRecord.h"
namespace org::apache::nifi::minifi::core::repository {
void VolatileRepository::loadComponent(const std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
}
bool VolatileRepository::initialize(const std::shared_ptr<Configure> &configure) {
repo_data_.initialize(configure, core::ThreadedRepository::getName());
logging::LOG_INFO(logger_) << "Resizing value_vector for " << core::ThreadedRepository::getName() << " count is " << repo_data_.max_count;
logging::LOG_INFO(logger_) << "Using a maximum size for " << core::ThreadedRepository::getName() << " of " << repo_data_.max_size;
return true;
}
bool VolatileRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) {
RepoValue<std::string> new_value(key, buf, bufLen);
const size_t size = new_value.size();
bool updated = false;
size_t reclaimed_size = 0;
RepoValue<std::string> old_value;
do {
uint32_t private_index = current_index_.fetch_add(1);
// round robin through the beginning
if (private_index >= repo_data_.max_count) {
uint32_t new_index = private_index + 1;
if (current_index_.compare_exchange_weak(new_index, 1)) {
private_index = 0;
} else {
continue;
}
}
updated = repo_data_.value_vector.at(private_index)->setRepoValue(new_value, old_value, reclaimed_size);
logger_->log_debug("Set repo value at %u out of %u updated %u current_size %u, adding %u to %u",
private_index, repo_data_.max_count, updated, reclaimed_size, size, repo_data_.current_size.load());
if (updated && reclaimed_size > 0) {
emplace(old_value);
}
if (reclaimed_size > 0) {
/**
* this is okay since current_size is really an estimate.
* we don't need precise counts.
*/
if (repo_data_.current_size < reclaimed_size) {
repo_data_.current_size = 0;
} else {
repo_data_.current_size -= reclaimed_size;
}
}
} while (!updated);
repo_data_.current_size += size;
logger_->log_debug("VolatileRepository -- put %zu %" PRIu32, repo_data_.current_size.load(), current_index_.load());
return true;
}
bool VolatileRepository::MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>>& data) {
for (const auto& item : data) {
if (!Put(item.first, item.second->getBuffer().template as_span<const uint8_t>().data(), item.second->size())) {
return false;
}
}
return true;
}
bool VolatileRepository::Delete(const std::string& key) {
logger_->log_debug("Delete from volatile");
for (auto ent : repo_data_.value_vector) {
// let the destructor do the cleanup
RepoValue<std::string> value;
if (ent->getValue(key, value)) {
repo_data_.current_size -= value.size();
logger_->log_debug("Delete and pushed into purge_list from volatile");
emplace(value);
return true;
}
}
return false;
}
bool VolatileRepository::Get(const std::string &key, std::string &value) {
for (auto ent : repo_data_.value_vector) {
// let the destructor do the cleanup
RepoValue<std::string> repo_value;
if (ent->getValue(key, repo_value)) {
repo_data_.current_size -= value.size();
repo_value.emplace(value);
return true;
}
}
return false;
}
bool VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store,
size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
size_t requested_batch = max_size;
max_size = 0;
for (auto ent : repo_data_.value_vector) {
// let the destructor do the cleanup
RepoValue<std::string> repo_value;
if (ent->getValue(repo_value)) {
std::shared_ptr<core::SerializableComponent> newComponent = lambda();
// we've taken ownership of this repo value
newComponent->DeSerialize(repo_value.getBuffer());
store.push_back(newComponent);
repo_data_.current_size -= repo_value.getBuffer().size();
if (max_size++ >= requested_batch) {
break;
}
}
}
return max_size > 0;
}
bool VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
logger_->log_debug("VolatileRepository -- DeSerialize %u", repo_data_.current_size.load());
max_size = 0;
for (auto ent : repo_data_.value_vector) {
// let the destructor do the cleanup
RepoValue<std::string> repo_value;
if (ent->getValue(repo_value)) {
// we've taken ownership of this repo value
store.at(max_size)->DeSerialize(repo_value.getBuffer());
repo_data_.current_size -= repo_value.getBuffer().size();
if (max_size++ >= store.size()) {
break;
}
}
}
return max_size > 0;
}
} // namespace org::apache::nifi::minifi::core::repository