MINIFICPP-1228 - Refactor ResourceClaim reference counting
MINIFICPP-1228 - Processors can own FlowFiles.
MINIFICPP-1228 - From now on it isn't the Connection's responsibility to persist the FlowFiles.
MINIFICPP-1228 - BinFiles properly restores files.
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #807
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3479070..e38993f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -768,6 +768,10 @@
registerTest("${TEST_DIR}/flow-tests")
+if (NOT DISABLE_ROCKSDB AND NOT DISABLE_LIBARCHIVE)
+ registerTest("${TEST_DIR}/persistence-tests")
+endif()
+
include(BuildDocs)
include(DockerConfig)
diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp
index bbb32cc..a7881ba 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -46,6 +46,7 @@
core::Property BinFiles::MaxBinCount("Maximum number of Bins", "Specifies the maximum number of bins that can be held in memory at any one time", "100");
core::Relationship BinFiles::Original("original", "The FlowFiles that were used to create the bundle");
core::Relationship BinFiles::Failure("failure", "If the bundle cannot be created, all FlowFiles that would have been used to create the bundle will be transferred to failure");
+core::Relationship BinFiles::Self("__self__", "Marks the FlowFile to be owned by this processor");
const char *BinFiles::FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
const char *BinFiles::FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
const char *BinFiles::FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
@@ -153,7 +154,7 @@
void BinManager::removeOldestBin() {
std::lock_guard < std::mutex > lock(mutex_);
uint64_t olddate = ULLONG_MAX;
- std::unique_ptr < std::deque<std::unique_ptr<Bin>>>*oldqueue;
+ std::unique_ptr < std::deque<std::unique_ptr<Bin>>>* oldqueue;
for (std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>>::iterator it=groupBinMap_.begin(); it !=groupBinMap_.end(); ++it) {
std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
if (!queue->empty()) {
@@ -235,6 +236,28 @@
}
void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+ // Rollback is not viable for this processor!!
+ {
+ // process resurrected FlowFiles first
+ auto flowFiles = file_store_.getNewFlowFiles();
+ // these are already processed FlowFiles, that we own
+ bool hadFailure = false;
+ for (auto &file : flowFiles) {
+ std::string groupId = getGroupId(context.get(), file);
+ bool offer = this->binManager_.offer(groupId, file);
+ if (!offer) {
+ session->transfer(file, Failure);
+ hadFailure = true;
+ } else {
+ // no need to route successfully captured such files as we already own them
+ }
+ }
+ if (hadFailure) {
+ context->yield();
+ return;
+ }
+ }
+
std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord > (session->get());
if (flow != nullptr) {
@@ -247,9 +270,8 @@
context->yield();
return;
}
-
- // remove the flowfile from the process session, it add to merge session later.
- session->remove(flow);
+ // assuming ownership over the incoming flowFile
+ session->transfer(flow, Self);
}
// migrate bin to ready bin
@@ -266,18 +288,19 @@
binManager_.getReadyBin(readyBins);
// process the ready bin
- if (!readyBins.empty()) {
+ while (!readyBins.empty()) {
// create session for merge
+ // we have to create a new session
+ // for each merge as a rollback erases all
+ // previously added files
core::ProcessSession mergeSession(context);
- while (!readyBins.empty()) {
- std::unique_ptr<Bin> bin = std::move(readyBins.front());
- readyBins.pop_front();
- // add bin's flows to the session
- this->addFlowsToSession(context.get(), &mergeSession, bin);
- logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
- if (!this->processBin(context.get(), &mergeSession, bin))
- this->transferFlowsToFail(context.get(), &mergeSession, bin);
- }
+ std::unique_ptr<Bin> bin = std::move(readyBins.front());
+ readyBins.pop_front();
+ // add bin's flows to the session
+ this->addFlowsToSession(context.get(), &mergeSession, bin);
+ logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
+ if (!this->processBin(context.get(), &mergeSession, bin))
+ this->transferFlowsToFail(context.get(), &mergeSession, bin);
mergeSession.commit();
}
}
@@ -297,6 +320,43 @@
}
}
+void BinFiles::put(std::shared_ptr<core::Connectable> flow) {
+ auto flowFile = std::dynamic_pointer_cast<core::FlowFile>(flow);
+ if (!flowFile) return;
+ if (flowFile->getOriginalConnection()) {
+ // onTrigger assumed ownership over a FlowFile
+ // don't have to do anything
+ return;
+ }
+ // no original connection i.e. we are during restore
+ file_store_.put(flowFile);
+}
+
+void BinFiles::FlowFileStore::put(std::shared_ptr<core::FlowFile>& flowFile) {
+ {
+ std::lock_guard<std::mutex> guard(flow_file_mutex_);
+ incoming_files_.emplace(std::move(flowFile));
+ }
+ has_new_flow_file_.store(true, std::memory_order_release);
+}
+
+std::unordered_set<std::shared_ptr<core::FlowFile>> BinFiles::FlowFileStore::getNewFlowFiles() {
+ bool hasNewFlowFiles = true;
+ if (!has_new_flow_file_.compare_exchange_strong(hasNewFlowFiles, false, std::memory_order_acquire, std::memory_order_relaxed)) {
+ return {};
+ }
+ std::lock_guard<std::mutex> guard(flow_file_mutex_);
+ return std::move(incoming_files_);
+}
+
+std::set<std::shared_ptr<core::Connectable>> BinFiles::getOutGoingConnections(const std::string &relationship) const {
+ auto result = core::Connectable::getOutGoingConnections(relationship);
+ if (relationship == Self.getName()) {
+ result.insert(std::static_pointer_cast<core::Connectable>(std::const_pointer_cast<core::Processor>(shared_from_this())));
+ }
+ return result;
+}
+
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index 098ac0e..f8b89ea 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -194,7 +194,7 @@
bool offer(const std::string &group, std::shared_ptr<core::FlowFile> flow);
// gather ready bins once the bin are full enough or exceed bin age
void gatherReadyBins();
- // remove oldest bin
+ // marks oldest bin as ready
void removeOldestBin();
// get ready bin from binManager
void getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins);
@@ -218,6 +218,8 @@
// BinFiles Class
class BinFiles : public core::Processor {
+ protected:
+ static core::Relationship Self;
public:
// Constructor
/*!
@@ -262,14 +264,18 @@
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
// OnTrigger method, implemented by NiFi BinFiles
- virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
}
// OnTrigger method, implemented by NiFi BinFiles
- virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi BinFiles
- virtual void initialize(void);
+ void initialize(void) override;
+
+ void put(std::shared_ptr<core::Connectable> flow) override;
+
+ std::set<std::shared_ptr<core::Connectable>> getOutGoingConnections(const std::string &relationship) const override;
protected:
// Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId().
@@ -284,14 +290,29 @@
}
// transfer flows to failure in bin
void transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
- // add flows to session
+ // moves owned flows to session
void addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
BinManager binManager_;
private:
+ class FlowFileStore{
+ public:
+ /**
+ * Returns the already-preprocessed FlowFiles that got restored on restart from the FlowFileRepository
+ * @return the resurrected persisted FlowFiles
+ */
+ std::unordered_set<std::shared_ptr<core::FlowFile>> getNewFlowFiles();
+ void put(std::shared_ptr<core::FlowFile>& flowFile);
+ private:
+ std::atomic_bool has_new_flow_file_{false};
+ std::mutex flow_file_mutex_;
+ std::unordered_set<std::shared_ptr<core::FlowFile>> incoming_files_;
+ };
+
std::shared_ptr<logging::Logger> logger_;
int maxBinCount_;
+ FlowFileStore file_store_;
};
REGISTER_RESOURCE(BinFiles, "Bins flow files into buckets based on the number of entries or size of entries");
diff --git a/extensions/libarchive/CompressContent.cpp b/extensions/libarchive/CompressContent.cpp
index 3512939..c6bd9df 100644
--- a/extensions/libarchive/CompressContent.cpp
+++ b/extensions/libarchive/CompressContent.cpp
@@ -110,6 +110,8 @@
return;
}
+ session->remove(flowFile);
+
std::string compressFormat = compressFormat_;
if (compressFormat_ == COMPRESSION_FORMAT_ATTRIBUTE) {
std::string attr;
@@ -202,7 +204,6 @@
}
logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", processFlowFile->getUUIDStr(), fileName);
session->transfer(processFlowFile, Success);
- session->remove(flowFile);
}
}
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 127ce9b..5184c7c 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -81,10 +81,10 @@
return; // Stop here - don't delete from content repo while we have records in FF repo
}
- if (nullptr != content_repo_) {
+ if (content_repo_) {
for (const auto &ffr : purgeList) {
auto claim = ffr->getResourceClaim();
- if (claim != nullptr) {
+ if (claim) {
content_repo_->removeIfOrphaned(claim);
}
}
@@ -148,22 +148,27 @@
std::string key = it->key().ToString();
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
logger_->log_debug("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
- auto search = connectionMap.find(eventRead->getConnectionUuid());
- if (!corrupt_checkpoint && search != connectionMap.end()) {
+ bool found = false;
+ auto search = containers.find(eventRead->getConnectionUuid());
+ found = (search != containers.end());
+ if (!found) {
+ // for backward compatibility
+ search = connectionMap.find(eventRead->getConnectionUuid());
+ found = (search != connectionMap.end());
+ }
+ if (!corrupt_checkpoint && found) {
// we find the connection for the persistent flowfile, create the flowfile and enqueue that
std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
eventRead->setStoredToRepository(true);
search->second->put(eventRead);
} else {
logger_->log_warn("Could not find connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
- if (eventRead->getContentFullPath().length() > 0) {
- if (nullptr != eventRead->getResourceClaim()) {
- content_repo_->remove(eventRead->getResourceClaim());
- }
- }
+ auto claim = eventRead->getResourceClaim();
+ if (claim) claim->decreaseFlowFileRecordOwnedCount();
keys_to_delete.enqueue(key);
}
} else {
+ // failed to deserialize FlowFile, cannot clear claim
keys_to_delete.enqueue(key);
}
}
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 0d89355..43ace43 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -103,6 +103,8 @@
return;
}
+ session->remove(flowFile);
+
std::string directory;
if (!context->getProperty(Directory, directory, flowFile)) {
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 8969310..6451fc9 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -154,7 +154,7 @@
return queued_data_size_;
}
void put(std::shared_ptr<core::Connectable> flow) override {
- std::shared_ptr<core::FlowFile> ff = std::static_pointer_cast<core::FlowFile>(flow);
+ std::shared_ptr<core::FlowFile> ff = std::dynamic_pointer_cast<core::FlowFile>(flow);
if (nullptr != ff) {
put(ff);
}
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index 666b6d2..1fa4c5d 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -160,7 +160,7 @@
}
const std::string getContentFullPath() {
- return content_full_fath_;
+ return claim_ ? claim_->getContentFullPath() : "";
}
/**
@@ -175,8 +175,6 @@
protected:
// connection uuid
std::string uuid_connection_;
- // Full path to the content
- std::string content_full_fath_;
// Local flow sequence ID
static std::atomic<uint64_t> local_flow_seq_number_;
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index 212e651..badea9f 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -77,7 +77,7 @@
* Get outgoing connection based on relationship
* @return set of outgoing connections.
*/
- std::set<std::shared_ptr<Connectable>> getOutGoingConnections(const std::string &relationship) const;
+ virtual std::set<std::shared_ptr<Connectable>> getOutGoingConnections(const std::string &relationship) const;
virtual void put(std::shared_ptr<Connectable> flow) {
}
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index 563a8e7..ce64873 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -22,6 +22,7 @@
#include <memory>
#include <set>
#include <string>
+#include <utility>
#include "utils/TimeUtil.h"
#include "ResourceClaim.h"
@@ -35,9 +36,67 @@
namespace core {
class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+ class FlowFileOwnedResourceClaimPtr{
+ public:
+ FlowFileOwnedResourceClaimPtr() = default;
+ explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+ // taking ownership of claim, no need to increment/decrement
+ }
+ FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+ FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+ FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+ return set(owner, ref.claim_);
+ }
+ FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+ auto oldClaim = claim_;
+ claim_ = newClaim;
+ // the order of increase/release is important
+ // with refcount manipulation we should always increment first, then decrement as this way we don't accidentally
+ // discard the object under ourselves, note that an equality check will not suffice as two ResourceClaim
+ // instances can reference the same file (they could have the same contentPath)
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ if (oldClaim) owner.releaseClaim(oldClaim);
+ return *this;
+ }
+ const std::shared_ptr<ResourceClaim>& get() const {
+ return claim_;
+ }
+ const std::shared_ptr<ResourceClaim>& operator->() const {
+ return claim_;
+ }
+ operator bool() const noexcept {
+ return static_cast<bool>(claim_);
+ }
+ ~FlowFileOwnedResourceClaimPtr() {
+ // allow the owner FlowFile to manually release the claim
+ // while logging stuff and removing it from repositories
+ assert(!claim_);
+ }
+
+ private:
+ /*
+ * We are aiming for the constraint that all FlowFiles should have a non-null claim pointer,
+ * unfortunately, for now, some places (e.g. ProcessSession::create) violate this constraint.
+ * We should indicate an empty or invalid content with special claims like
+ * InvalidResourceClaim and EmptyResourceClaim.
+ */
+ std::shared_ptr<ResourceClaim> claim_;
+ };
+
public:
FlowFile();
- ~FlowFile();
+ ~FlowFile() override;
FlowFile& operator=(const FlowFile& other);
/**
@@ -48,7 +107,7 @@
/**
* Sets _claim to the inbound claim argument
*/
- void setResourceClaim(std::shared_ptr<ResourceClaim> &claim);
+ void setResourceClaim(const std::shared_ptr<ResourceClaim>& claim);
/**
* clear the resource claim
@@ -59,22 +118,22 @@
* Returns a pointer to this flow file record's
* claim at the given stash key
*/
- std::shared_ptr<ResourceClaim> getStashClaim(const std::string &key);
+ std::shared_ptr<ResourceClaim> getStashClaim(const std::string& key);
/**
* Sets the given stash key to the inbound claim argument
*/
- void setStashClaim(const std::string &key, const std::shared_ptr<ResourceClaim> &claim);
+ void setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim);
/**
* Clear the resource claim at the given stash key
*/
- void clearStashClaim(const std::string &key);
+ void clearStashClaim(const std::string& key);
/**
* Return true if the given stash claim exists
*/
- bool hasStashClaim(const std::string &key);
+ bool hasStashClaim(const std::string& key);
/**
* Decrease the flow file record owned count for the resource claim and, if
@@ -85,7 +144,7 @@
/**
* Get lineage identifiers
*/
- std::set<std::string> &getlineageIdentifiers();
+ std::set<std::string>& getlineageIdentifiers();
/**
* Returns whether or not this flow file record
@@ -134,7 +193,7 @@
* @param value value to set
* @return result of finding key
*/
- bool getAttribute(std::string key, std::string &value) const;
+ bool getAttribute(std::string key, std::string& value) const;
/**
* Updates the value in the attribute map that corresponds
@@ -155,7 +214,7 @@
/**
* setAttribute, if attribute already there, update it, else, add it
*/
- void setAttribute(const std::string &key, const std::string &value) {
+ void setAttribute(const std::string& key, const std::string& value) {
attributes_[key] = value;
}
@@ -179,7 +238,7 @@
* adds an attribute if it does not exist
*
*/
- bool addAttribute(const std::string &key, const std::string &value);
+ bool addAttribute(const std::string& key, const std::string& value);
/**
* Set the size of this record.
@@ -220,7 +279,7 @@
*/
uint64_t getOffset() const;
- bool getUUID(utils::Identifier &other) {
+ bool getUUID(utils::Identifier& other) {
other = uuid_;
return true;
}
@@ -237,12 +296,12 @@
/**
* Yield
*/
- virtual void yield() {
+ void yield() override {
}
/**
* Determines if we are connected and operating
*/
- virtual bool isRunning() {
+ bool isRunning() override {
return true;
}
@@ -250,7 +309,7 @@
* Determines if work is available by this connectable
* @return boolean if work is available.
*/
- virtual bool isWorkAvailable() {
+ bool isWorkAvailable() override {
return true;
}
@@ -258,13 +317,13 @@
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
- void setConnection(std::shared_ptr<core::Connectable> &connection);
+ void setConnection(std::shared_ptr<core::Connectable>& connection);
/**
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
- void setConnection(std::shared_ptr<core::Connectable> &&connection);
+ void setConnection(std::shared_ptr<core::Connectable>&& connection);
/**
* Returns the connection referenced by this record.
@@ -275,7 +334,7 @@
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
- void setOriginalConnection(std::shared_ptr<core::Connectable> &connection);
+ void setOriginalConnection(std::shared_ptr<core::Connectable>& connection);
/**
* Returns the original connection referenced by this record.
* @return shared original connection pointer.
@@ -314,9 +373,9 @@
// Attributes key/values pairs for the flow record
std::map<std::string, std::string> attributes_;
// Pointer to the associated content resource claim
- std::shared_ptr<ResourceClaim> claim_;
+ FlowFileOwnedResourceClaimPtr claim_;
// Pointers to stashed content resource claims
- std::map<std::string, std::shared_ptr<ResourceClaim>> stashedContent_;
+ std::map<std::string, FlowFileOwnedResourceClaimPtr> stashedContent_;
// UUID string
// std::string uuid_str_;
// UUID string for all parents
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 370426a..1f7a90a 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -228,6 +228,8 @@
void getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap);
+ void getFlowFileContainers(std::map<std::string, std::shared_ptr<Connectable>> &containers) const;
+
void drainConnections();
std::size_t getTotalFlowFileCount() const;
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index b98eb85..e6e4f23 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -59,10 +59,10 @@
provenance_report_ = std::make_shared<provenance::ProvenanceReporter>(repo, process_context_->getProcessorNode()->getName(), process_context_->getProcessorNode()->getName());
}
-// Destructor
+ // Destructor
virtual ~ProcessSession();
-// Commit the session
+ // Commit the session
void commit();
// Roll Back the session
void rollback();
@@ -70,7 +70,6 @@
std::shared_ptr<provenance::ProvenanceReporter> getProvenanceReporter() {
return provenance_report_;
}
- //
// Get the FlowFile from the highest priority queue
virtual std::shared_ptr<core::FlowFile> get();
// Create a new UUID FlowFile with no content resource claim and without parent
@@ -81,7 +80,7 @@
std::shared_ptr<core::FlowFile> create(const std::shared_ptr<core::FlowFile> &parent);
// Add a FlowFile to the session
virtual void add(const std::shared_ptr<core::FlowFile> &flow);
-// Clone a new UUID FlowFile from parent both for content resource claim and attributes
+ // Clone a new UUID FlowFile from parent both for content resource claim and attributes
std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent);
// Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size);
@@ -140,21 +139,24 @@
ProcessSession &operator=(const ProcessSession &parent) = delete;
protected:
-// FlowFiles being modified by current process session
- std::map<std::string, std::shared_ptr<core::FlowFile> > _updatedFlowFiles;
+ // FlowFiles being modified by current process session
+ std::map<std::string, std::shared_ptr<core::FlowFile>> _updatedFlowFiles;
// Copy of the original FlowFiles being modified by current process session as above
- std::map<std::string, std::shared_ptr<core::FlowFile> > _originalFlowFiles;
+ std::map<std::string, std::shared_ptr<core::FlowFile>> _flowFileSnapShots;
// FlowFiles being added by current process session
- std::map<std::string, std::shared_ptr<core::FlowFile> > _addedFlowFiles;
+ std::map<std::string, std::shared_ptr<core::FlowFile>> _addedFlowFiles;
// FlowFiles being deleted by current process session
- std::map<std::string, std::shared_ptr<core::FlowFile> > _deletedFlowFiles;
+ std::map<std::string, std::shared_ptr<core::FlowFile>> _deletedFlowFiles;
// FlowFiles being transfered to the relationship
std::map<std::string, Relationship> _transferRelationship;
// FlowFiles being cloned for multiple connections per relationship
- std::map<std::string, std::shared_ptr<core::FlowFile> > _clonedFlowFiles;
+ std::map<std::string, std::shared_ptr<core::FlowFile>> _clonedFlowFiles;
private:
-// Clone the flow file during transfer to multiple connections for a relationship
+ void persistFlowFilesBeforeTransfer(
+ std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap,
+ const std::map<std::string, std::shared_ptr<FlowFile>>& originalFlowFileSnapShots);
+ // Clone the flow file during transfer to multiple connections for a relationship
std::shared_ptr<core::FlowFile> cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent);
// ProcessContext
std::shared_ptr<ProcessContext> process_context_;
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index dea7212..78ec5ec 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -215,8 +215,6 @@
// Whether flow file queue full in any of the outgoin connection
bool flowFilesOutGoingFull();
- // Get outgoing connections based on relationship name
- std::set<std::shared_ptr<Connection> > getOutGoingConnections(std::string relationship);
// Add connection
bool addConnection(std::shared_ptr<Connectable> connection);
// Remove connection
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
index 671b468..b52405c 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -54,7 +54,7 @@
return processor_;
}
- void yield() {
+ void yield() override {
processor_->yield();
}
@@ -107,7 +107,7 @@
* Returns theflow version
* @returns flow version. can be null if a flow version is not tracked.
*/
- virtual std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const {
+ std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const override {
if (processor_ != nullptr) {
return processor_->getFlowIdentifier();
} else {
@@ -224,7 +224,7 @@
* Get outgoing connection based on relationship
* @return set of outgoing connections.
*/
- std::set<std::shared_ptr<Connectable>> getOutGoingConnections(std::string relationship) {
+ std::set<std::shared_ptr<Connectable>> getOutGoingConnections(const std::string& relationship) const override {
return processor_->getOutGoingConnections(relationship);
}
@@ -236,7 +236,7 @@
return processor_->getNextIncomingConnection();
}
- std::shared_ptr<Connectable> pickIncomingConnection() {
+ std::shared_ptr<Connectable> pickIncomingConnection() override {
return processor_->pickIncomingConnection();
}
@@ -269,7 +269,7 @@
}
// Get Process Name
- std::string getName() const {
+ std::string getName() const override {
return processor_->getName();
}
@@ -281,18 +281,18 @@
processor_->setMaxConcurrentTasks(tasks);
}
- virtual bool supportsDynamicProperties() {
+ bool supportsDynamicProperties() override {
return false;
}
- virtual bool isRunning();
+ bool isRunning() override;
- virtual bool isWorkAvailable();
+ bool isWorkAvailable() override;
virtual ~ProcessorNode();
protected:
- virtual bool canEdit() {
+ bool canEdit() override {
return !processor_->isRunning();
}
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index e44c479..0741323 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -117,6 +117,10 @@
this->connectionMap = connectionMap;
}
+ void setContainers(std::map<std::string, std::shared_ptr<core::Connectable>> &containers) {
+ this->containers = containers;
+ }
+
virtual bool Get(const std::string &key, std::string &value) {
return false;
}
@@ -228,6 +232,8 @@
Repository &operator=(const Repository &parent) = delete;
protected:
+ std::map<std::string, std::shared_ptr<core::Connectable>> containers;
+
std::map<std::string, std::shared_ptr<core::Connectable>> connectionMap;
// Mutex for protection
std::mutex mutex_;
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 06cfcce..334d960 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -148,17 +148,6 @@
logger_->log_debug("Enqueue flow file UUID %s to connection %s", flow->getUUIDStr(), name_);
}
- if (!flow->isStored()) {
- // Save to the flowfile repo
- FlowFileRecord event(flow_repository_, content_repo_, flow, this->uuidStr_);
- if (event.Serialize()) {
- flow->setStoredToRepository(true);
- } else {
- logger_->log_error("Failed to serialize FlowFileRecord to repo!");
- throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfile to repository");
- }
- }
-
// Notify receiving processor that work may be available
if (dest_connectable_) {
logger_->log_debug("Notifying %s that %s was inserted", dest_connectable_->getName(), flow->getUUIDStr());
@@ -167,8 +156,6 @@
}
void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
- std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>> flowData;
-
{
std::lock_guard<std::mutex> lock(mutex_);
@@ -182,35 +169,12 @@
queued_data_size_ += ff->getSize();
logger_->log_debug("Enqueue flow file UUID %s to connection %s", ff->getUUIDStr(), name_);
-
- if (!ff->isStored()) {
- // Save to the flowfile repo
- FlowFileRecord event(flow_repository_, content_repo_, ff, this->uuidStr_);
-
- std::unique_ptr<io::DataStream> stramptr(new io::DataStream());
- event.Serialize(*stramptr.get());
-
- flowData.emplace_back(event.getUUIDStr(), std::move(stramptr));
- }
}
}
- if (!flow_repository_->MultiPut(flowData)) {
- logger_->log_error("Failed execute multiput on FF repo!");
- throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfiles to repository");
- }
-
- for (auto& ff : flows) {
- if (drop_empty_ && ff->getSize() == 0) {
- continue;
- }
-
- ff->setStoredToRepository(true);
-
- if (dest_connectable_) {
- logger_->log_debug("Notifying %s that flowfiles were inserted", dest_connectable_->getName());
- dest_connectable_->notifyWork();
- }
+ if (dest_connectable_) {
+ logger_->log_debug("Notifying %s that flowfiles were inserted", dest_connectable_->getName());
+ dest_connectable_->notifyWork();
}
}
@@ -228,9 +192,6 @@
// Flow record expired
expiredFlowRecords.insert(item);
logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
- if (flow_repository_->Delete(item->getUUIDStr())) {
- item->setStoredToRepository(false);
- }
} else {
// Flow record not expired
if (item->isPenalized()) {
@@ -268,10 +229,12 @@
while (!queue_.empty()) {
std::shared_ptr<core::FlowFile> item = queue_.front();
queue_.pop();
- logger_->log_debug("Delete flow file UUID %s from connection %s", item->getUUIDStr(), name_);
+ logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
if (delete_permanently) {
- if (flow_repository_->Delete(item->getUUIDStr())) {
+ if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
item->setStoredToRepository(false);
+ auto claim = item->getResourceClaim();
+ if (claim) claim->decreaseFlowFileRecordOwnedCount();
}
}
}
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 80a0102..da1c676 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -405,10 +405,13 @@
if (this->flow_file_repo_ != nullptr) {
logger_->log_debug("Getting connection map");
std::map<std::string, std::shared_ptr<core::Connectable>> connectionMap;
+ std::map<std::string, std::shared_ptr<core::Connectable>> containers;
if (this->root_ != nullptr) {
this->root_->getConnections(connectionMap);
+ this->root_->getFlowFileContainers(containers);
}
flow_file_repo_->setConnectionMap(connectionMap);
+ flow_file_repo_->setContainers(containers);
flow_file_repo_->loadComponent(content_repo_);
} else {
logger_->log_debug("Flow file repository is not set");
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 6dbbd75..0d66412 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -17,7 +17,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "FlowFileRecord.h"
#include <time.h>
#include <cstdio>
#include <vector>
@@ -27,6 +26,8 @@
#include <string>
#include <iostream>
#include <fstream>
+#include <cinttypes>
+#include "FlowFileRecord.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/Relationship.h"
#include "core/Repository.h"
@@ -45,7 +46,7 @@
content_repo_(content_repo),
flow_repository_(flow_repository) {
id_ = local_flow_seq_number_.load();
- claim_ = claim;
+ claim_.set(*this, claim);
// Increase the local ID for the flow record
++local_flow_seq_number_;
// Populate the default attributes
@@ -59,12 +60,6 @@
}
snapshot_ = false;
-
- if (claim_ != nullptr) {
- // Increase the flow file record owned count for the resource claim
- claim_->increaseFlowFileRecordOwnedCount();
- content_full_fath_ = claim->getContentFullPath();
- }
}
FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<core::FlowFile> &event,
@@ -82,10 +77,7 @@
offset_ = event->getOffset();
event->getUUID(uuid_);
uuid_connection_ = uuidConnection;
- if (event->getResourceClaim()) {
- event->getResourceClaim()->increaseFlowFileRecordOwnedCount();
- content_full_fath_ = event->getResourceClaim()->getContentFullPath();
- }
+ claim_.set(*this, event->getResourceClaim());
if (event->getFlowIdentifier()) {
std::string attr;
event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
@@ -102,6 +94,7 @@
snapshot_(""),
content_repo_(content_repo),
flow_repository_(flow_repository) {
+ claim_.set(*this, event->getResourceClaim());
if (event->getFlowIdentifier()) {
std::string attr;
event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
@@ -118,29 +111,26 @@
logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
else
logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
- if (claim_) {
- releaseClaim(claim_);
- } else {
+
+ if (!claim_) {
logger_->log_debug("Claim is null ptr for %s", uuidStr_);
}
+ claim_.set(*this, nullptr);
+
// Disown stash claims
- for (const auto &stashPair : stashedContent_) {
- releaseClaim(stashPair.second);
+ for (auto &stashPair : stashedContent_) {
+ auto& stashClaim = stashPair.second;
+ stashClaim.set(*this, nullptr);
}
}
void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
// Decrease the flow file record owned count for the resource claim
- claim_->decreaseFlowFileRecordOwnedCount();
- std::string value;
- logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath(), claim_->getFlowFileRecordOwnedCount());
- if (claim_->getFlowFileRecordOwnedCount() <= 0) {
- // we cannot rely on the stored variable here since we aren't guaranteed atomicity
- if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
- logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath());
- content_repo_->remove(claim_);
- }
+ claim->decreaseFlowFileRecordOwnedCount();
+ logger_->log_debug("Detaching Resource Claim %s, %s, attempt " "%" PRIu64, getUUIDStr(), claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount());
+ if (content_repo_ && content_repo_->removeIfOrphaned(claim)) {
+ logger_->log_debug("Deleted Resource Claim %s", claim->getContentFullPath());
}
}
@@ -187,7 +177,6 @@
FlowFileRecord &FlowFileRecord::operator=(const FlowFileRecord &other) {
core::FlowFile::operator=(other);
uuid_connection_ = other.uuid_connection_;
- content_full_fath_ = other.content_full_fath_;
snapshot_ = other.snapshot_;
return *this;
}
@@ -260,7 +249,7 @@
}
}
- ret = writeUTF(this->content_full_fath_, &outStream);
+ ret = writeUTF(this->getContentFullPath(), &outStream);
if (ret <= 0) {
return false;
}
@@ -291,6 +280,8 @@
if (flow_repository_->Put(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
logger_->log_debug("NiFi FlowFile Store event %s size %llu success", uuidStr_, outStream.getSize());
+ // on behalf of the persisted record instance
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
return true;
} else {
logger_->log_error("NiFi FlowFile Store event %s size %llu fail", uuidStr_, outStream.getSize());
@@ -351,7 +342,8 @@
this->attributes_[key] = value;
}
- ret = readUTF(this->content_full_fath_, &outStream);
+ std::string content_full_path;
+ ret = readUTF(content_full_path, &outStream);
if (ret <= 0) {
return false;
}
@@ -366,9 +358,7 @@
return false;
}
- if (nullptr == claim_) {
- claim_ = std::make_shared<ResourceClaim>(content_full_fath_, content_repo_, true);
- }
+ claim_.set(*this, std::make_shared<ResourceClaim>(content_full_path, content_repo_, true));
return true;
}
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 6b7ee2f..f94f2c9 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -64,9 +64,7 @@
size_ = other.size_;
penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
attributes_ = other.attributes_;
- claim_ = other.claim_;
- if (claim_ != nullptr)
- this->claim_->increaseFlowFileRecordOwnedCount();
+ claim_.set(*this, other.claim_);
uuidStr_ = other.uuidStr_;
connection_ = other.connection_;
original_connection_ = other.original_connection_;
@@ -95,36 +93,39 @@
}
std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
- return claim_;
+ return claim_.get();
}
void FlowFile::clearResourceClaim() {
- claim_ = nullptr;
+ claim_.set(*this, nullptr);
}
-void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) {
- claim_ = claim;
+void FlowFile::setResourceClaim(const std::shared_ptr<ResourceClaim>& claim) {
+ claim_.set(*this, claim);
}
-std::shared_ptr<ResourceClaim> FlowFile::getStashClaim(const std::string &key) {
- return stashedContent_[key];
+std::shared_ptr<ResourceClaim> FlowFile::getStashClaim(const std::string& key) {
+ return stashedContent_[key].get();
}
-void FlowFile::setStashClaim(const std::string &key, const std::shared_ptr<ResourceClaim> &claim) {
+void FlowFile::setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim) {
if (hasStashClaim(key)) {
logger_->log_warn("Stashing content of record %s to existing key %s; "
"existing content will be overwritten",
getUUIDStr().c_str(), key.c_str());
- releaseClaim(getStashClaim(key));
}
- stashedContent_[key] = claim;
+ stashedContent_[key].set(*this, claim);
}
-void FlowFile::clearStashClaim(const std::string &key) {
- stashedContent_.erase(key);
+void FlowFile::clearStashClaim(const std::string& key) {
+ auto claimIt = stashedContent_.find(key);
+ if (claimIt != stashedContent_.end()) {
+ claimIt->second.set(*this, nullptr);
+ stashedContent_.erase(claimIt);
+ }
}
-bool FlowFile::hasStashClaim(const std::string &key) {
+bool FlowFile::hasStashClaim(const std::string& key) {
return stashedContent_.find(key) != stashedContent_.end();
}
@@ -140,11 +141,11 @@
return lineage_start_date_;
}
-std::set<std::string> &FlowFile::getlineageIdentifiers() {
+std::set<std::string>& FlowFile::getlineageIdentifiers() {
return lineage_Identifiers_;
}
-bool FlowFile::getAttribute(std::string key, std::string &value) const {
+bool FlowFile::getAttribute(std::string key, std::string& value) const {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
value = it->second;
@@ -183,7 +184,7 @@
}
}
-bool FlowFile::addAttribute(const std::string &key, const std::string &value) {
+bool FlowFile::addAttribute(const std::string& key, const std::string& value) {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
// attribute already there in the map
@@ -202,7 +203,7 @@
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
-void FlowFile::setOriginalConnection(std::shared_ptr<core::Connectable> &connection) {
+void FlowFile::setOriginalConnection(std::shared_ptr<core::Connectable>& connection) {
original_connection_ = connection;
}
@@ -210,7 +211,7 @@
* Sets the connection with a shared pointer.
* @param connection shared connection.
*/
-void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) {
+void FlowFile::setConnection(std::shared_ptr<core::Connectable>& connection) {
connection_ = connection;
}
@@ -218,7 +219,7 @@
* Sets the connection with a shared pointer.
* @param connection shared connection.
*/
-void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) {
+void FlowFile::setConnection(std::shared_ptr<core::Connectable>&& connection) {
connection_ = connection;
}
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 01e403e..28e6c9a 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -360,6 +360,20 @@
}
}
+void ProcessGroup::getFlowFileContainers(std::map<std::string, std::shared_ptr<Connectable>> &containers) const {
+ for (auto connection : connections_) {
+ containers[connection->getUUIDStr()] = connection;
+ containers[connection->getName()] = connection;
+ }
+ for (auto processor : processors_) {
+ // processors can also own FlowFiles
+ containers[processor->getUUIDStr()] = processor;
+ }
+ for (auto processGroup : child_process_groups_) {
+ processGroup->getFlowFileContainers(containers);
+ }
+}
+
void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 67ab5e1..e14d812 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -88,7 +88,11 @@
}
void ProcessSession::add(const std::shared_ptr<core::FlowFile> &record) {
+ if (_updatedFlowFiles.find(record->getUUIDStr()) != _updatedFlowFiles.end()) {
+ throw Exception(ExceptionType::PROCESSOR_EXCEPTION, "Mustn't add file that was provided by this session");
+ }
_addedFlowFiles[record->getUUIDStr()] = record;
+ record->setDeleted(false);
}
std::shared_ptr<core::FlowFile> ProcessSession::create(const std::shared_ptr<core::FlowFile> &parent) {
@@ -130,10 +134,9 @@
// Copy Resource Claim
std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
record->setResourceClaim(parent_claim);
- if (parent_claim != nullptr) {
+ if (parent_claim) {
record->setOffset(parent->getOffset());
record->setSize(parent->getSize());
- record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
}
provenance_report_->clone(parent, record);
}
@@ -170,10 +173,9 @@
// Copy Resource Claim
std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
record->setResourceClaim(parent_claim);
- if (parent_claim != nullptr) {
+ if (parent_claim) {
record->setOffset(parent->getOffset());
record->setSize(parent->getSize());
- record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
}
provenance_report_->clone(parent, record);
}
@@ -198,11 +200,7 @@
record->setOffset(parent->getOffset() + offset);
record->setSize(size);
// Copy Resource Claim
- std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
- record->setResourceClaim(parent_claim);
- if (parent_claim != nullptr) {
- record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
- }
+ record->setResourceClaim(parent->getResourceClaim());
}
provenance_report_->clone(parent, record);
}
@@ -211,15 +209,6 @@
void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
flow->setDeleted(true);
- if (flow->getResourceClaim() != nullptr) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- logger_->log_debug("Auto terminated %s %" PRIu64 " %s", flow->getResourceClaim()->getContentFullPath(), flow->getResourceClaim()->getFlowFileRecordOwnedCount(), flow->getUUIDStr());
- } else {
- logger_->log_debug("Flow does not contain content. no resource claim to decrement.");
- }
- if (_addedFlowFiles.find(flow->getUUIDStr()) == _addedFlowFiles.end()) {
- process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr());
- }
_deletedFlowFiles[flow->getUUIDStr()] = flow;
std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
provenance_report_->drop(flow, reason);
@@ -248,6 +237,7 @@
void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
_transferRelationship[flow->getUUIDStr()] = relationship;
+ flow->setDeleted(false);
}
void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
@@ -255,28 +245,19 @@
try {
uint64_t startTime = getTimeMillis();
- claim->increaseFlowFileRecordOwnedCount();
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
// Call the callback to write the content
if (nullptr == stream) {
- claim->decreaseFlowFileRecordOwnedCount();
rollback();
return;
}
if (callback->process(stream) < 0) {
- claim->decreaseFlowFileRecordOwnedCount();
rollback();
return;
}
flow->setSize(stream->getSize());
flow->setOffset(0);
- std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
- if (flow_claim != nullptr) {
- // Remove the old claim
- flow_claim->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
flow->setResourceClaim(claim);
stream->closeStream();
@@ -285,31 +266,21 @@
uint64_t endTime = getTimeMillis();
provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
} catch (std::exception &exception) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
logger_->log_debug("Caught Exception %s", exception.what());
throw;
} catch (...) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
logger_->log_debug("Caught Exception during process session write");
throw;
}
}
void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
- std::shared_ptr<ResourceClaim> claim = nullptr;
- if (flow->getResourceClaim() == nullptr) {
+ std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
+ if (!claim) {
// No existed claim for append, we need to create new claim
return write(flow, callback);
}
- claim = flow->getResourceClaim();
-
try {
uint64_t startTime = getTimeMillis();
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim, true);
@@ -392,12 +363,10 @@
try {
auto startTime = getTimeMillis();
- claim->increaseFlowFileRecordOwnedCount();
std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim);
if (nullptr == content_stream) {
logger_->log_debug("Could not obtain claim for %s", claim->getContentFullPath());
- claim->decreaseFlowFileRecordOwnedCount();
rollback();
return;
}
@@ -414,11 +383,6 @@
flow->setSize(content_stream->getSize());
flow->setOffset(0);
- if (flow->getResourceClaim() != nullptr) {
- // Remove the old claim
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
flow->setResourceClaim(claim);
logger_->log_debug("Import offset %" PRIu64 " length %" PRIu64 " into content %s for FlowFile UUID %s",
@@ -430,17 +394,9 @@
auto endTime = getTimeMillis();
provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
} catch (std::exception &exception) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
logger_->log_debug("Caught Exception %s", exception.what());
throw;
} catch (...) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
logger_->log_debug("Caught Exception during process session write");
throw;
}
@@ -455,10 +411,8 @@
auto startTime = getTimeMillis();
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
- claim->increaseFlowFileRecordOwnedCount();
std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
if (nullptr == stream) {
- claim->decreaseFlowFileRecordOwnedCount();
rollback();
return;
}
@@ -490,11 +444,6 @@
if (!invalidWrite) {
flow->setSize(stream->getSize());
flow->setOffset(0);
- if (flow->getResourceClaim() != nullptr) {
- // Remove the old claim
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
flow->setResourceClaim(claim);
logger_->log_debug("Import offset %" PRIu64 " length %" PRIu64 " into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(),
@@ -517,17 +466,9 @@
throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
}
} catch (std::exception &exception) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
logger_->log_debug("Caught Exception %s", exception.what());
throw;
} catch (...) {
- if (flow && flow->getResourceClaim() == claim) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flow->clearResourceClaim();
- }
logger_->log_debug("Caught Exception during process session write");
throw;
}
@@ -540,111 +481,97 @@
std::vector<uint8_t> buffer(getpagesize());
try {
- try {
- std::ifstream input{source, std::ios::in | std::ios::binary};
- logger_->log_debug("Opening %s", source);
- if (!input.is_open() || !input.good()) {
- throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+ std::ifstream input{source, std::ios::in | std::ios::binary};
+ logger_->log_debug("Opening %s", source);
+ if (!input.is_open() || !input.good()) {
+ throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+ }
+ if (offset != 0U) {
+ input.seekg(offset, std::ifstream::beg);
+ if (!input.good()) {
+ logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+ throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
}
- if (offset != 0U) {
- input.seekg(offset, std::ifstream::beg);
- if (!input.good()) {
- logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
- throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
- }
+ }
+ uint64_t startTime = 0U;
+ while (input.good()) {
+ input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+ std::streamsize read = input.gcount();
+ if (read < 0) {
+ throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
}
- uint64_t startTime = 0U;
- while (input.good()) {
- input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
- std::streamsize read = input.gcount();
- if (read < 0) {
- throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
- }
- if (read == 0) {
- logger_->log_trace("Finished reading input %s", source);
+ if (read == 0) {
+ logger_->log_trace("Finished reading input %s", source);
+ break;
+ } else {
+ logging::LOG_TRACE(logger_) << "Read input of " << read;
+ }
+ uint8_t* begin = buffer.data();
+ uint8_t* end = begin + read;
+ while (true) {
+ startTime = getTimeMillis();
+ uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+ const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+ logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+ /*
+ * We do not want to process the rest of the buffer after the last delimiter if
+ * - we have reached EOF in the file (we would discard it anyway)
+ * - there is nothing to process (the last character in the buffer is a delimiter)
+ */
+ if (delimiterPos == end && (input.eof() || len == 0)) {
break;
- } else {
- logging::LOG_TRACE(logger_) << "Read input of " << read;
}
- uint8_t* begin = buffer.data();
- uint8_t* end = begin + read;
- while (true) {
+
+ /* Create claim and stream if needed and append data */
+ if (claim == nullptr) {
startTime = getTimeMillis();
- uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
- const auto len = gsl::narrow<int>(delimiterPos - begin);
-
- logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
- /*
- * We do not want to process the rest of the buffer after the last delimiter if
- * - we have reached EOF in the file (we would discard it anyway)
- * - there is nothing to process (the last character in the buffer is a delimiter)
- */
- if (delimiterPos == end && (input.eof() || len == 0)) {
- break;
- }
-
- /* Create claim and stream if needed and append data */
- if (claim == nullptr) {
- startTime = getTimeMillis();
- claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
- }
- if (stream == nullptr) {
- stream = process_context_->getContentRepository()->write(claim);
- }
- if (stream == nullptr) {
- logger_->log_error("Stream is null");
- rollback();
- return;
- }
- if (stream->write(begin, len) != len) {
- logger_->log_error("Error while writing");
- stream->closeStream();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
- }
-
- /* Create a FlowFile if we reached a delimiter */
- if (delimiterPos == end) {
- break;
- }
- flowFile = std::static_pointer_cast<FlowFileRecord>(create());
- flowFile->setSize(stream->getSize());
- flowFile->setOffset(0);
- if (flowFile->getResourceClaim() != nullptr) {
- /* Remove the old claim */
- flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flowFile->clearResourceClaim();
- }
- flowFile->setResourceClaim(claim);
- claim->increaseFlowFileRecordOwnedCount();
- logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
- << ", FlowFile UUID " << flowFile->getUUIDStr();
- stream->closeStream();
- std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flowFile, details, endTime - startTime);
- flows.push_back(flowFile);
-
- /* Reset these to start processing the next FlowFile with a clean slate */
- flowFile.reset();
- stream.reset();
- claim.reset();
-
- /* Skip delimiter */
- begin = delimiterPos + 1;
+ claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
}
+ if (stream == nullptr) {
+ stream = process_context_->getContentRepository()->write(claim);
+ }
+ if (stream == nullptr) {
+ logger_->log_error("Stream is null");
+ rollback();
+ return;
+ }
+ if (stream->write(begin, len) != len) {
+ logger_->log_error("Error while writing");
+ stream->closeStream();
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+ }
+
+ /* Create a FlowFile if we reached a delimiter */
+ if (delimiterPos == end) {
+ break;
+ }
+ flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+ flowFile->setSize(stream->getSize());
+ flowFile->setOffset(0);
+ flowFile->setResourceClaim(claim);
+ logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+ << ", FlowFile UUID " << flowFile->getUUIDStr();
+ stream->closeStream();
+ std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+ flows.push_back(flowFile);
+
+ /* Reset these to start processing the next FlowFile with a clean slate */
+ flowFile.reset();
+ stream.reset();
+ claim.reset();
+
+ /* Skip delimiter */
+ begin = delimiterPos + 1;
}
- } catch (std::exception &exception) {
- logger_->log_debug("Caught Exception %s", exception.what());
- throw;
- } catch (...) {
- logger_->log_debug("Caught Exception during process session write");
- throw;
}
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
} catch (...) {
- if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
- flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flowFile->clearResourceClaim();
- }
+ logger_->log_debug("Caught Exception during process session write");
throw;
}
}
@@ -700,39 +627,38 @@
void ProcessSession::stash(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
logger_->log_debug("Stashing content from %s to key %s", flow->getUUIDStr(), key);
- if (!flow->getResourceClaim()) {
+ auto claim = flow->getResourceClaim();
+ if (!claim) {
logger_->log_warn("Attempted to stash content of record %s when "
"there is no resource claim",
flow->getUUIDStr());
return;
}
-// Stash the claim
- auto claim = flow->getResourceClaim();
+ // Stash the claim
flow->setStashClaim(key, claim);
-// Clear current claim
+ // Clear current claim
flow->clearResourceClaim();
}
void ProcessSession::restore(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
logger_->log_info("Restoring content to %s from key %s", flow->getUUIDStr(), key);
-// Restore the claim
+ // Restore the claim
if (!flow->hasStashClaim(key)) {
logger_->log_warn("Requested restore to record %s from unknown key %s", flow->getUUIDStr(), key);
return;
}
-// Disown current claim if existing
+ // Disown current claim if existing
if (flow->getResourceClaim()) {
logger_->log_warn("Restoring stashed content of record %s from key %s when there is "
"existing content; existing content will be overwritten",
flow->getUUIDStr(), key);
- flow->releaseClaim(flow->getResourceClaim());
}
-// Restore the claim
+ // Restore the claim
auto stashClaim = flow->getStashClaim(key);
flow->setResourceClaim(stashClaim);
flow->clearStashClaim(key);
@@ -829,9 +755,9 @@
}
}
- std::map<std::shared_ptr<Connection>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
+ std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
- std::shared_ptr<Connection> connection = nullptr;
+ std::shared_ptr<Connectable> connection = nullptr;
// Complete process the added and update flow files for the session, send the flow file to its queue
for (const auto &it : _updatedFlowFiles) {
std::shared_ptr<core::FlowFile> record = it.second;
@@ -840,7 +766,7 @@
continue;
}
- connection = std::static_pointer_cast<Connection>(record->getConnection());
+ connection = record->getConnection();
if ((connection) != nullptr) {
connectionQueues[connection].push_back(record);
}
@@ -851,7 +777,7 @@
if (record->isDeleted()) {
continue;
}
- connection = std::static_pointer_cast<Connection>(record->getConnection());
+ connection = record->getConnection();
if ((connection) != nullptr) {
connectionQueues[connection].push_back(record);
}
@@ -863,14 +789,41 @@
if (record->isDeleted()) {
continue;
}
- connection = std::static_pointer_cast<Connection>(record->getConnection());
+ connection = record->getConnection();
if ((connection) != nullptr) {
connectionQueues[connection].push_back(record);
}
}
+ for (const auto& it : _deletedFlowFiles) {
+ auto record = it.second;
+ if (!record->isDeleted()) {
+ continue;
+ }
+ if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
+ record->setStoredToRepository(false);
+ auto claim = record->getResourceClaim();
+ if (claim) {
+ claim->decreaseFlowFileRecordOwnedCount();
+ logger_->log_debug("Decrementing resource claim on behalf of the persisted instance %s %" PRIu64 " %s",
+ claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount(), record->getUUIDStr());
+ } else {
+ logger_->log_debug("Flow does not contain content. no resource claim to decrement.");
+ }
+ }
+ }
+
+ persistFlowFilesBeforeTransfer(connectionQueues, _flowFileSnapShots);
+
for (auto& cq : connectionQueues) {
- cq.first->multiPut(cq.second);
+ auto connection = std::dynamic_pointer_cast<Connection>(cq.first);
+ if (connection) {
+ connection->multiPut(cq.second);
+ } else {
+ for (auto& file : cq.second) {
+ cq.first->put(file);
+ }
+ }
}
// All done
@@ -878,7 +831,7 @@
_addedFlowFiles.clear();
_clonedFlowFiles.clear();
_deletedFlowFiles.clear();
- _originalFlowFiles.clear();
+ _flowFileSnapShots.clear();
_transferRelationship.clear();
// persistent the provenance report
@@ -894,14 +847,18 @@
}
void ProcessSession::rollback() {
- std::map<std::shared_ptr<Connection>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
+ // new FlowFiles are only persisted during commit
+ // no need to delete them here
+ std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
try {
- std::shared_ptr<Connection> connection = nullptr;
- // Requeue the snapshot of the flowfile back
- for (const auto &it : _originalFlowFiles) {
+ std::shared_ptr<Connectable> connection = nullptr;
+ // Restore the flowFiles from the snapshot
+ for (const auto &it : _updatedFlowFiles) {
std::shared_ptr<core::FlowFile> record = it.second;
- connection = std::static_pointer_cast<Connection>(record->getOriginalConnection());
+ auto snaphost = _flowFileSnapShots[record->getUUIDStr()];
+ *record = *snaphost;
+ connection = record->getOriginalConnection();
if ((connection) != nullptr) {
std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<FlowFileRecord>(record);
flowf->setSnapShot(false);
@@ -910,11 +867,23 @@
}
}
- for (auto& cq : connectionQueues) {
- cq.first->multiPut(cq.second);
+ for (const auto& it : _deletedFlowFiles) {
+ it.second->setDeleted(false);
}
- _originalFlowFiles.clear();
+ // put everything back where it came from
+ for (auto& cq : connectionQueues) {
+ auto connection = std::dynamic_pointer_cast<Connection>(cq.first);
+ if (connection) {
+ connection->multiPut(cq.second);
+ } else {
+ for (auto& flow : cq.second) {
+ cq.first->put(flow);
+ }
+ }
+ }
+
+ _flowFileSnapShots.clear();
_clonedFlowFiles.clear();
_addedFlowFiles.clear();
@@ -930,6 +899,71 @@
}
}
+void ProcessSession::persistFlowFilesBeforeTransfer(
+ std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile> > >& transactionMap,
+ const std::map<std::string, std::shared_ptr<FlowFile>>& originalFlowFileSnapShots) {
+
+ std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>> flowData;
+
+ auto flowFileRepo = process_context_->getFlowFileRepository();
+ auto contentRepo = process_context_->getContentRepository();
+
+ for (auto& transaction : transactionMap) {
+ const std::shared_ptr<Connectable>& target = transaction.first;
+ std::shared_ptr<Connection> connection = std::dynamic_pointer_cast<Connection>(target);
+ const bool shouldDropEmptyFiles = connection ? connection->getDropEmptyFlowFiles() : false;
+ auto& flows = transaction.second;
+ for (auto &ff : flows) {
+ if (shouldDropEmptyFiles && ff->getSize() == 0) {
+ // the receiver will drop this FF
+ continue;
+ }
+ FlowFileRecord event(flowFileRepo, contentRepo, ff, target->getUUIDStr());
+
+ std::unique_ptr<io::DataStream> stream(new io::DataStream());
+ event.Serialize(*stream);
+
+ flowData.emplace_back(event.getUUIDStr(), std::move(stream));
+ }
+ }
+
+ if (!flowFileRepo->MultiPut(flowData)) {
+ logger_->log_error("Failed execute multiput on FF repo!");
+ throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfiles to repository");
+ }
+
+ for (auto& transaction : transactionMap) {
+ const std::shared_ptr<Connectable>& target = transaction.first;
+ std::shared_ptr<Connection> connection = std::dynamic_pointer_cast<Connection>(target);
+ const bool shouldDropEmptyFiles = connection ? connection->getDropEmptyFlowFiles() : false;
+ auto& flows = transaction.second;
+ for (auto &ff : flows) {
+ auto snapshotIt = originalFlowFileSnapShots.find(ff->getUUIDStr());
+ auto original = snapshotIt != originalFlowFileSnapShots.end() ? snapshotIt->second : nullptr;
+ if (shouldDropEmptyFiles && ff->getSize() == 0) {
+ // the receiver promised to drop this FF, no need for it anymore
+ if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
+ // original must be non-null since this flowFile is already stored in the repos ->
+ // must have come from a session->get()
+ auto claim = original->getResourceClaim();
+ // decrement on behalf of the persisted-instance-to-be-deleted
+ if (claim) claim->decreaseFlowFileRecordOwnedCount();
+ ff->setStoredToRepository(false);
+ }
+ continue;
+ }
+ auto claim = ff->getResourceClaim();
+ // increment on behalf of the persisted instance
+ if (claim) claim->increaseFlowFileRecordOwnedCount();
+ auto originalClaim = original ? original->getResourceClaim() : nullptr;
+ // decrement on behalf of the overridden instance if any
+ if (ff->isStored() && originalClaim) originalClaim->decreaseFlowFileRecordOwnedCount();
+
+ ff->setStoredToRepository(true);
+ }
+ }
+}
+
std::shared_ptr<core::FlowFile> ProcessSession::get() {
std::shared_ptr<Connectable> first = process_context_->getProcessorNode()->pickIncomingConnection();
@@ -949,6 +983,12 @@
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " expire flow record " << record->getUUIDStr();
provenance_report_->expire(record, details.str());
+ // there is no rolling back expired FlowFiles
+ if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
+ record->setStoredToRepository(false);
+ auto claim = record->getResourceClaim();
+ if (claim) claim->decreaseFlowFileRecordOwnedCount();
+ }
}
}
if (ret) {
@@ -964,9 +1004,9 @@
snapshot->setAttribute(attr, flow_version->getFlowId());
}
logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr());
- snapshot = ret;
+ *snapshot = *ret;
// save a snapshot
- _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
+ _flowFileSnapShots[snapshot->getUUIDStr()] = snapshot;
return ret;
}
current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode()->pickIncomingConnection());
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 6ce3123..9556000 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -238,7 +238,8 @@
// We already has connection for this relationship
std::set<std::shared_ptr<Connectable>> existedConnection = connection_pair.second;
const bool has_full_connection = std::any_of(begin(existedConnection), end(existedConnection), [](const std::shared_ptr<Connectable>& conn) {
- return std::static_pointer_cast<Connection>(conn)->isFull();
+ auto connection = std::dynamic_pointer_cast<Connection>(conn);
+ return connection && connection->isFull();
});
if (has_full_connection) { return true; }
}
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
new file mode 100644
index 0000000..f40a180
--- /dev/null
+++ b/libminifi/test/BufferReader.h
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+ explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+
+ int write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
+ uint8_t tmpBuffer[4096]{};
+ std::size_t remaining_len = len;
+ int total_read = 0;
+ while (remaining_len > 0) {
+ auto ret = input.read(tmpBuffer, std::min(remaining_len, sizeof(tmpBuffer)));
+ if (ret == 0) break;
+ if (ret < 0) return ret;
+ remaining_len -= ret;
+ total_read += ret;
+ auto prevSize = buffer_.size();
+ buffer_.resize(prevSize + ret);
+ std::move(tmpBuffer, tmpBuffer + ret, buffer_.data() + prevSize);
+ }
+ return total_read;
+ }
+
+ int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+ return write(*stream.get(), stream->getSize());
+ }
+
+ private:
+ std::vector<uint8_t>& buffer_;
+};
+
+#endif //NIFI_MINIFI_CPP_BUFFERREADER_H
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index e6920fb..0173ba2 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -464,6 +464,7 @@
{
auto session = std::make_shared<core::ProcessSession>(context);
processor->onTrigger(context, session);
+ session->commit();
}
// validate the merge content
std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -751,7 +752,3 @@
}
LogTestController::getInstance().reset();
}
-
-
-
-
diff --git a/libminifi/test/flow-tests/CMakeLists.txt b/libminifi/test/flow-tests/CMakeLists.txt
index 905b38b..e377aff 100644
--- a/libminifi/test/flow-tests/CMakeLists.txt
+++ b/libminifi/test/flow-tests/CMakeLists.txt
@@ -34,7 +34,7 @@
target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
MATH(EXPR FLOW_TEST_COUNT "${FLOW_TEST_COUNT}+1")
- add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}")
ENDFOREACH()
message("-- Finished building ${FLOW_TEST_COUNT} flow test file(s)...")
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
index 6bd6699..903a384 100644
--- a/libminifi/test/flow-tests/FlowControllerTests.cpp
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -124,11 +124,11 @@
auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
- // prevent the initial trigger
- // in case the source got triggered
- // and the scheduler triggers the sink
- // before we could initiate the shutdown
- sinkProc->yield(100);
+ std::promise<void> execSinkPromise;
+ std::future<void> execSinkFuture = execSinkPromise.get_future();
+ sinkProc->onTriggerCb_ = [&] {
+ execSinkFuture.wait();
+ };
testController.startFlow();
@@ -140,8 +140,8 @@
REQUIRE(root->getTotalFlowFileCount() == 3);
REQUIRE(sourceProc->trigger_count.load() == 1);
- REQUIRE(sinkProc->trigger_count.load() == 0);
+ execSinkPromise.set_value();
controller->stop(true);
REQUIRE(sourceProc->trigger_count.load() == 1);
@@ -158,12 +158,10 @@
auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
- // prevent the initial trigger
- // in case the source got triggered
- // and the scheduler triggers the sink
- sinkProc->yield(100);
-
+ std::promise<void> execSinkPromise;
+ std::future<void> execSinkFuture = execSinkPromise.get_future();
sinkProc->onTriggerCb_ = [&]{
+ execSinkFuture.wait();
static std::atomic<bool> first_onTrigger{true};
bool isFirst = true;
// sleep only on the first trigger
@@ -182,8 +180,8 @@
REQUIRE(root->getTotalFlowFileCount() == 3);
REQUIRE(sourceProc->trigger_count.load() == 1);
- REQUIRE(sinkProc->trigger_count.load() == 0);
+ execSinkPromise.set_value();
controller->stop(true);
REQUIRE(sourceProc->trigger_count.load() == 1);
@@ -202,12 +200,10 @@
auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
- // prevent the initial trigger
- // in case the source got triggered
- // and the scheduler triggers the sink
- sinkProc->yield(100);
-
+ std::promise<void> execSinkPromise;
+ std::future<void> execSinkFuture = execSinkPromise.get_future();
sinkProc->onTriggerCb_ = [&]{
+ execSinkFuture.wait();
static std::atomic<bool> first_onTrigger{true};
bool isFirst = true;
// sleep only on the first trigger
@@ -226,9 +222,9 @@
REQUIRE(root->getTotalFlowFileCount() == 3);
REQUIRE(sourceProc->trigger_count.load() == 1);
- REQUIRE(sinkProc->trigger_count.load() == 0);
std::thread shutdownThread([&]{
+ execSinkPromise.set_value();
controller->stop(true);
});
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h
index aee610d..b91c849 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -56,6 +56,8 @@
}
~TestControllerWithFlow() {
+ controller_->stop(true);
+ controller_->unload();
LogTestController::getInstance().reset();
}
diff --git a/libminifi/test/persistence-tests/CMakeLists.txt b/libminifi/test/persistence-tests/CMakeLists.txt
new file mode 100644
index 0000000..a2ae85c
--- /dev/null
+++ b/libminifi/test/persistence-tests/CMakeLists.txt
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB PERSISTENCE_TESTS "*.cpp")
+SET(PERSISTENCE_TEST_COUNT 0)
+FOREACH(testfile ${PERSISTENCE_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/libarchive")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/rocksdb-repos/")
+ target_include_directories(${testfilename} BEFORE PRIVATE "${ROCKSDB_THIRDPARTY_ROOT}/include")
+ target_wholearchive_library(${testfilename} minifi-archive-extensions)
+ target_wholearchive_library(${testfilename} minifi-standard-processors)
+ target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+ createTests("${testfilename}")
+ target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+ MATH(EXPR PERSISTENCE_TEST_COUNT "${PERSISTENCE_TEST_COUNT}+1")
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${ARCHIVE-PERSISTENCE_TEST_COUNT} Persistence related test file(s)...")
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
new file mode 100644
index 0000000..b032410
--- /dev/null
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -0,0 +1,319 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <chrono>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "core/Core.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/RepositoryFactory.h"
+#include "FlowFileRecord.h"
+#include "FlowFileRepository.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "../TestBase.h"
+#include "../../extensions/libarchive/MergeContent.h"
+#include "../test/BufferReader.h"
+
+using Connection = minifi::Connection;
+using MergeContent = minifi::processors::MergeContent;
+
+struct TestFlow{
+ TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
+ const std::function<std::shared_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
+ : ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) {
+ std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+
+ // setup processor
+ {
+ processor = processorGenerator(mainProcUUID());
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+ processorContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
+ }
+
+ // setup INPUT processor
+ {
+ inputProcessor = std::make_shared<core::Processor>("source", inputProcUUID());
+ std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(inputProcessor);
+ inputContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo,
+ ff_repository, content_repo);
+ }
+
+ // setup Input Connection
+ {
+ input = std::make_shared<Connection>(ff_repository, content_repo, "Input", inputConnUUID());
+ input->setRelationship({"input", "d"});
+ input->setDestinationUUID(mainProcUUID());
+ input->setSourceUUID(inputProcUUID());
+ inputProcessor->addConnection(input);
+ }
+
+ // setup Output Connection
+ {
+ output = std::make_shared<Connection>(ff_repository, content_repo, "Output", outputConnUUID());
+ output->setRelationship(relationshipToOutput);
+ output->setSourceUUID(mainProcUUID());
+ }
+
+ // setup ProcessGroup
+ {
+ root = std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+ root->addProcessor(processor);
+ root->addConnection(input);
+ root->addConnection(output);
+ }
+
+ // prepare Merge Processor for execution
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ processor->onSchedule(processorContext.get(), new core::ProcessSessionFactory(processorContext));
+ }
+ std::shared_ptr<core::FlowFile> write(const std::string& data) {
+ minifi::io::DataStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+ core::ProcessSession sessionGenFlowFile(inputContext);
+ std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+ sessionGenFlowFile.importFrom(stream, flow);
+ assert(flow->getResourceClaim()->getFlowFileRecordOwnedCount() == 1);
+ sessionGenFlowFile.transfer(flow, {"input", "d"});
+ sessionGenFlowFile.commit();
+ return flow;
+ }
+ std::string read(const std::shared_ptr<core::FlowFile>& file) {
+ core::ProcessSession session(processorContext);
+ std::vector<uint8_t> buffer;
+ BufferReader reader(buffer);
+ session.read(file, &reader);
+ return {buffer.data(), buffer.data() + buffer.size()};
+ }
+ void trigger() {
+ auto session = std::make_shared<core::ProcessSession>(processorContext);
+ processor->onTrigger(processorContext, session);
+ session->commit();
+ }
+
+ std::shared_ptr<Connection> input;
+ std::shared_ptr<Connection> output;
+ std::shared_ptr<core::ProcessGroup> root;
+
+ private:
+ static utils::Identifier& mainProcUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
+ static utils::Identifier& inputProcUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
+ static utils::Identifier& inputConnUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
+ static utils::Identifier& outputConnUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
+
+ std::shared_ptr<core::Processor> inputProcessor;
+ std::shared_ptr<core::Processor> processor;
+ std::shared_ptr<core::repository::FlowFileRepository> ff_repository;
+ std::shared_ptr<core::ContentRepository> content_repo;
+ std::shared_ptr<core::Repository> prov_repo;
+ std::shared_ptr<core::ProcessContext> inputContext;
+ std::shared_ptr<core::ProcessContext> processorContext;
+};
+
+std::shared_ptr<MergeContent> setupMergeProcessor(const utils::Identifier& id) {
+ auto processor = std::make_shared<MergeContent>("MergeContent", id);
+ processor->initialize();
+ processor->setAutoTerminatedRelationships({{"original", "d"}});
+
+ processor->setProperty(MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+ processor->setProperty(MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+ processor->setProperty(MergeContent::DelimiterStrategy, DELIMITER_STRATEGY_TEXT);
+ processor->setProperty(MergeContent::MinEntries, "3");
+ processor->setProperty(MergeContent::Header, "_Header_");
+ processor->setProperty(MergeContent::Footer, "_Footer_");
+ processor->setProperty(MergeContent::Demarcator, "_Demarcator_");
+ processor->setProperty(MergeContent::MaxBinAge, "1 h");
+
+ return processor;
+}
+
+TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+ LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+ LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+ char format[] = "/tmp/test.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+ std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::repository::FlowFileRepository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ ff_repository->initialize(config);
+ content_repo->initialize(config);
+
+ auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, config, "")};
+ auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+ {
+ TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge);
+
+ flowController->load(flow.root);
+ ff_repository->start();
+
+ // write two files into the input
+ flow.write("one");
+ flow.write("two");
+ // capture them with the Merge Processor
+ flow.trigger();
+ flow.trigger();
+
+ ff_repository->stop();
+ flowController->unload();
+
+ // check if the processor has taken ownership
+ std::set<std::shared_ptr<core::FlowFile>> expired;
+ auto file = flow.input->poll(expired);
+ REQUIRE(!file);
+ REQUIRE(expired.empty());
+
+ file = flow.output->poll(expired);
+ REQUIRE(!file);
+ REQUIRE(expired.empty());
+ }
+
+ // swap the ProcessGroup and restart the FlowController
+ {
+ TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge);
+
+ flowController->load(flow.root);
+ ff_repository->start();
+ // wait for FlowFileRepository to start and notify the owners of
+ // the resurrected FlowFiles
+ std::this_thread::sleep_for(std::chrono::milliseconds{100});
+
+ // write the third file into the input
+ flow.write("three");
+
+ flow.trigger();
+ ff_repository->stop();
+ flowController->unload();
+
+ std::set<std::shared_ptr<core::FlowFile>> expired;
+ auto file = flow.output->poll(expired);
+ REQUIRE(file);
+ REQUIRE(expired.empty());
+
+ auto content = flow.read(file);
+ auto isOneOfPossibleResults =
+ Catch::Equals("_Header_one_Demarcator_two_Demarcator_three_Footer_")
+ || Catch::Equals("_Header_two_Demarcator_one_Demarcator_three_Footer_");
+
+ REQUIRE_THAT(content, isOneOfPossibleResults);
+ }
+}
+
+class ContentUpdaterProcessor : public core::Processor{
+ public:
+ ContentUpdaterProcessor(const std::string& name, utils::Identifier& id) : Processor(name, id) {}
+ void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
+ auto ff = session->get();
+ std::string data = "<override>";
+ minifi::io::DataStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+ session->importFrom(stream, ff);
+ session->transfer(ff, {"success", "d"});
+ }
+};
+
+std::shared_ptr<core::Processor> setupContentUpdaterProcessor(utils::Identifier& id) {
+ return std::make_shared<ContentUpdaterProcessor>("Updater", id);
+}
+
+TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
+ TestController testController;
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+ LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+ LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+ char format[] = "/tmp/test.XXXXXX";
+ auto dir = testController.createTempDirectory(format);
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+ std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
+ std::shared_ptr<core::repository::FlowFileRepository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+ std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ ff_repository->initialize(config);
+ content_repo->initialize(config);
+
+ auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, config, "")};
+ auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+ {
+ TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"});
+
+ flowController->load(flow.root);
+ ff_repository->start();
+
+ // write two files into the input
+ auto flowFile = flow.write("data");
+ auto claim = flowFile->getResourceClaim();
+ // one from the FlowFile and one from the persisted instance
+ REQUIRE(claim->getFlowFileRecordOwnedCount() == 2);
+ // update them with the Merge Processor
+ flow.trigger();
+
+ auto content = flow.read(flowFile);
+ REQUIRE(content == "<override>");
+ auto newClaim = flowFile->getResourceClaim();
+ // the processor added new content to the flowFile
+ REQUIRE(claim != newClaim);
+ // nobody holds an owning reference to the previous claim
+ REQUIRE(claim->getFlowFileRecordOwnedCount() == 0);
+ // one from the FlowFile and one from the persisted instance
+ REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
+
+ ff_repository->stop();
+ flowController->unload();
+ }
+
+ // swap the ProcessGroup and restart the FlowController
+ {
+ TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"});
+
+ flowController->load(flow.root);
+ ff_repository->start();
+ // wait for FlowFileRepository to start and notify the owners of
+ // the resurrected FlowFiles
+ std::this_thread::sleep_for(std::chrono::milliseconds{100});
+
+ std::set<std::shared_ptr<core::FlowFile>> expired;
+ auto file = flow.output->poll(expired);
+ REQUIRE(file);
+ REQUIRE(expired.empty());
+
+ auto content = flow.read(file);
+ REQUIRE(content == "<override>");
+ // the still persisted instance and this FlowFile
+ REQUIRE(file->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);
+
+ ff_repository->stop();
+ flowController->unload();
+ }
+}
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 7521d6f..b48ebdd 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -164,23 +164,22 @@
std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo);
- minifi::FlowFileRecord record(repository, content_repo, attributes, claim);
+ {
+ minifi::FlowFileRecord record(repository, content_repo, attributes, claim);
- record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
+ record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
- record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd");
+ record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd");
- REQUIRE(record.Serialize());
+ REQUIRE(record.Serialize());
- claim->decreaseFlowFileRecordOwnedCount();
+ REQUIRE(repository->Delete(record.getUUIDStr()));
+ claim->decreaseFlowFileRecordOwnedCount();
- claim->decreaseFlowFileRecordOwnedCount();
+ repository->flush();
- repository->Delete(record.getUUIDStr());
-
- repository->flush();
-
- repository->stop();
+ repository->stop();
+ }
std::ifstream fileopen(ss.str(), std::ios::in);
REQUIRE(!fileopen.good());
@@ -272,7 +271,9 @@
ff_repository->initialize(config);
content_repo->initialize(config);
+ core::Relationship inputRel{"Input", "dummy"};
std::shared_ptr<minifi::Connection> input = std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+ input->setRelationship(inputRel);
auto root = std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
root->addConnection(input);
@@ -292,13 +293,18 @@
*/
{
std::shared_ptr<core::Processor> processor = std::make_shared<core::Processor>("dummy");
+ utils::Identifier uuid;
+ REQUIRE(processor->getUUID(uuid));
+ input->setSourceUUID(uuid);
+ processor->addConnection(input);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
sessionGenFlowFile.importFrom(content, flow);
- input->put(flow); // stores it in the flowFileRepository
+ sessionGenFlowFile.transfer(flow, inputRel);
+ sessionGenFlowFile.commit();
}
// remove flow from the connection but it is still present in the