MINIFICPP-1140 - Fix VolatileFlowFileRepository
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #723
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index 70affa7..c3298f6 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -55,7 +55,9 @@
std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(purgeItem.data()), purgeItem.size())) {
std::shared_ptr<minifi::ResourceClaim> newClaim = eventRead->getResourceClaim();
- content_repo_->remove(newClaim);
+ if (newClaim != nullptr) {
+ content_repo_->removeIfOrphaned(newClaim);
+ }
}
}
purge_list_.resize(0);
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index 5c67395..df1a899 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -79,6 +79,10 @@
virtual void run() = 0;
+ virtual bool isNoop() {
+ return false;
+ }
+
/**
* Places a new object into the volatile memory area
* @param key key to add to the repository
@@ -87,6 +91,12 @@
virtual bool Put(T key, const uint8_t *buf, size_t bufLen);
/**
+ * Places new objects into the volatile memory area
+ * @param data the key-value pairs to add to the repository
+ **/
+ virtual bool MultiPut(const std::vector<std::pair<T, std::unique_ptr<io::DataStream>>>& data);
+
+ /**
* Deletes the key
* @return status of the delete operation
*/
@@ -280,6 +290,17 @@
logger_->log_debug("VolatileRepository -- put %u %u", current_size_.load(), current_index_.load());
return true;
}
+
+template<typename T>
+bool VolatileRepository<T>::MultiPut(const std::vector<std::pair<T, std::unique_ptr<io::DataStream>>>& data) {
+ for (const auto& item : data) {
+ if (!Put(item.first, item.second->getBuffer(), item.second->getSize())) {
+ return false;
+ }
+ }
+ return true;
+}
+
/**
* Deletes the key
* @return status of the delete operation