blob: 4235a2309292e0ed5ec721853ac9123ab5ec03b2 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ref_count_hip.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_
#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_
#include <cstddef>
#include <cstring>
#include <iostream>
#include <chrono>
#include <functional>
#include <atomic>
#include <vector>
#include <map>
#include <iterator>
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
namespace repository {
/**
* Purpose: Repo value represents an item that will support a move operation within an AtomicEntry
*
* Justification: Since AtomicEntry is a static entry that does not move or change, the underlying
* RepoValue can be changed to support atomic operations.
*/
template<typename T>
class RepoValue {
public:
explicit RepoValue() {
}
/**
* Constructor that populates the item allowing for a custom key comparator.
* @param key key for this repo value.
* @param ptr buffer
* @param size size buffer
* @param comparator custom comparator.
*/
explicit RepoValue(T key, const uint8_t *ptr, size_t size, std::function<bool(T, T)> comparator = nullptr)
: key_(key),
comparator_(comparator) {
if (nullptr == ptr) {
size = 0;
}
buffer_.resize(size);
if (size > 0) {
std::memcpy(buffer_.data(), ptr, size);
}
}
/**
* RepoValue that moves the other object into this.
*/
explicit RepoValue(RepoValue<T> &&other)
noexcept : key_(std::move(other.key_)),
buffer_(std::move(other.buffer_)),
comparator_(std::move(other.comparator_)) {
}
~RepoValue()
{
}
T &getKey() {
return key_;
}
/**
* Sets the key, relacing the custom comparator if needed.
*/
void setKey(const T key, std::function<bool(T,T)> comparator = nullptr) {
key_ = key;
comparator_ = comparator;
}
/**
* Determines if the key is the same using the custom comparator
* @param other object to compare against
* @return result of the comparison
*/
inline bool isEqual(RepoValue<T> *other)
{
return comparator_ == nullptr ? key_ == other->key_ : comparator_(key_,other->key_);
}
/**
* Determines if the key is the same using the custom comparator
* @param other object to compare against
* @return result of the comparison
*/
inline bool isKey(T other)
{
return comparator_ == nullptr ? key_ == other : comparator_(key_,other);
}
/**
* Clears the buffer.
*/
void clearBuffer() {
buffer_.resize(0);
buffer_.clear();
}
/**
* Return the size of the memory within the key
* buffer, the size of timestamp, and the general
* system word size
*/
uint64_t size() {
return buffer_.size();
}
size_t getBufferSize() {
return buffer_.size();
}
const uint8_t *getBuffer()
{
return buffer_.data();
}
/**
* Places the contents of buffer into str
* @param strnig into which we are placing the memory contained in buffer.
*/
void emplace(std::string &str) {
str.insert(0, reinterpret_cast<const char*>(buffer_.data()), buffer_.size());
}
/**
* Appends ptr to the end of buffer.
* @param ptr pointer containing data to add to buffer_
*/
void append(uint8_t *ptr, size_t size)
{
buffer_.insert(buffer_.end(), ptr, ptr + size);
}
RepoValue<T> &operator=(RepoValue<T> &&other) noexcept {
key_ = std::move(other.key_);
buffer_ = std::move(other.buffer_);
return *this;
}
private:
T key_;
std::function<bool(T,T)> comparator_;
std::vector<uint8_t> buffer_;
};
/**
* Purpose: Atomic Entry allows us to create a statically
* sized ring buffer, with the ability to create
*
**/
template<typename T>
class AtomicEntry {
public:
/**
* Constructor that accepts a max size and an atomic counter for the total
* size allowd by this and other atomic entries.
*/
explicit AtomicEntry(std::atomic<size_t> *total_size, size_t *max_size)
: accumulated_repo_size_(total_size),
max_repo_size_(max_size),
write_pending_(false),
has_value_(false),
ref_count_(0),
free_required(false) {
}
/**
* Sets the repo value, moving the old value into old_value.
* @param new_value new value to move into value_.
* @param old_value the previous value of value_ will be moved into old_value
* @param prev_size size reclaimed.
* @return result of this set. If true old_value will be populated.
*/
bool setRepoValue(RepoValue<T> &new_value, RepoValue<T> &old_value, size_t &prev_size) {
// delete the underlying pointer
bool lock = false;
if (!write_pending_.compare_exchange_weak(lock, true)) {
return false;
}
if (has_value_) {
prev_size = value_.size();
}
old_value = std::move(value_);
value_ = std::move(new_value);
has_value_ = true;
try_unlock();
return true;
}
AtomicEntry<T> *takeOwnership() {
bool lock = false;
if (!write_pending_.compare_exchange_weak(lock, true))
return nullptr;
ref_count_++;
try_unlock();
return this;
}
/**
* A test and set operation, which is used to allow a function to test
* if an item can be released and a function used for reclaiming memory associated
* with said object.
* A custom comparator can be provided to augment the key being added into value_
*/
bool testAndSetKey(const T str, std::function<bool(T)> releaseTest = nullptr, std::function<void(T)> reclaimer = nullptr, std::function<bool(T, T)> comparator = nullptr) {
bool lock = false;
if (!write_pending_.compare_exchange_weak(lock, true))
return false;
if (has_value_) {
// we either don't have a release test or we cannot release this
// entity
if (releaseTest != nullptr && reclaimer != nullptr && releaseTest(value_.getKey())) {
reclaimer(value_.getKey());
} else if (free_required && ref_count_ == 0) {
size_t bufferSize = value_.getBufferSize();
value_.clearBuffer();
has_value_ = false;
if (accumulated_repo_size_ != nullptr) {
*accumulated_repo_size_ -= bufferSize;
}
free_required = false;
} else {
try_unlock();
return false;
}
}
ref_count_ = 1;
value_.setKey(str, comparator);
has_value_ = true;
try_unlock();
return true;
}
/**
* Moved the value into the argument
* @param value the previous value will be moved into this parameter
* @return success of get operation based on whether or not this atomic entry has a value.
*/
bool getValue(RepoValue<T> &value) {
try_lock();
if (!has_value_) {
try_unlock();
return false;
}
value = std::move(value_);
has_value_ = false;
try_unlock();
return true;
}
/**
* Moved the value into the argument
* @param value the previous value will be moved into this parameter
* @return success of get operation based on whether or not this atomic entry has a value.
*/
bool getValue(const T &key, RepoValue<T> &value) {
try_lock();
if (!has_value_) {
try_unlock();
return false;
}
if (!value_.isKey(key)) {
try_unlock();
return false;
}
value = std::move(value_);
has_value_ = false;
try_unlock();
return true;
}
void decrementOwnership() {
try_lock();
if (!has_value_) {
try_unlock();
return;
}
if (ref_count_ > 0) {
ref_count_--;
}
if (ref_count_ == 0 && free_required) {
size_t bufferSize = value_.getBufferSize();
value_.clearBuffer();
has_value_ = false;
if (accumulated_repo_size_ != nullptr) {
*accumulated_repo_size_ -= bufferSize;
}
free_required = false;
} else {
}
try_unlock();
}
/**
* Moved the value into the argument
* @param value the previous value will be moved into this parameter
* @return success of get operation based on whether or not this atomic entry has a value.
*/
bool getValue(const T &key, RepoValue<T> **value) {
try_lock();
if (!has_value_) {
try_unlock();
return false;
}
if (!value_.isKey(key)) {
try_unlock();
return false;
}
ref_count_++;
*value = &value_;
try_unlock();
return true;
}
/**
* Operation that will be used to test and free if a release is required without
* setting a new object.
* @param releaseTest function that will be used to free the RepoValue key from
* this atomic entry.
* @param freedValue informs the caller if an item was freed.
*/
T testAndFree(std::function<bool(T)> releaseTest, bool &freedValue) {
try_lock();
T ref;
if (!has_value_) {
try_unlock();
return ref;
}
if (releaseTest(value_.getKey())) {
size_t bufferSize = value_.getBufferSize();
value_.clearBuffer();
ref = value_.getKey();
has_value_ = false;
if (accumulated_repo_size_ != nullptr) {
*accumulated_repo_size_ -= bufferSize;
}
}
try_unlock();
return ref;
}
size_t getLength() {
size_t size = 0;
try_lock();
size = value_.getBufferSize();
try_unlock();
return size;
}
/**
* sets has_value to false; however, does not call
* any external entity to further free RepoValue
*/
bool freeValue(const T &key) {
try_lock();
if (!has_value_) {
try_unlock();
return false;
}
if (!value_.isKey(key)) {
try_unlock();
return false;
}
if (ref_count_ > 0) {
free_required = true;
try_unlock();
return true;
}
size_t bufferSize = value_.getBufferSize();
value_.clearBuffer();
has_value_ = false;
if (accumulated_repo_size_ != nullptr) {
*accumulated_repo_size_ -= bufferSize;
}
free_required = false;
try_unlock();
return true;
}
/**
* Appends buffer onto this atomic entry if key matches
* the current RepoValue's key.
*/
bool insert(const T key, uint8_t *buffer, size_t size) {
try_lock();
if (!has_value_) {
try_unlock();
return false;
}
if (!value_.isKey(key)) {
try_unlock();
return false;
}
if ((accumulated_repo_size_ != nullptr && max_repo_size_ != nullptr) && (*accumulated_repo_size_ + size > *max_repo_size_)) {
// can't support this write
try_unlock();
return false;
}
value_.append(buffer, size);
(*accumulated_repo_size_) += size;
try_unlock();
return true;
}
private:
/**
* Spin lock to unlock the current atomic entry.
*/
inline void try_lock() {
bool lock = false;
while (!write_pending_.compare_exchange_weak(lock, true, std::memory_order_acquire)) {
lock = false;
// attempt again
}
}
/**
* Spin lock to unlock the current atomic entry.
*/
inline void try_unlock() {
bool lock = true;
while (!write_pending_.compare_exchange_weak(lock, false, std::memory_order_acquire)) {
lock = true;
// attempt again
}
}
// atomic size pointer.
std::atomic<size_t> *accumulated_repo_size_;
// max size
size_t *max_repo_size_;
// determines if a write is pending.
std::atomic<bool> write_pending_;
// used to determine if a value is present in this atomic entry.
std::atomic<bool> has_value_;
std::atomic<uint16_t> ref_count_;
std::atomic<bool> free_required;
// repo value.
RepoValue<T> value_;
};
} /* namespace repository */
} /* namespace core */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_ */