MINIFICPP-1340 - Batch persist content changes
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #887
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index c3042f6..2d4284d 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -22,6 +22,7 @@
#include "RocksDbStream.h"
#include "rocksdb/merge_operator.h"
#include "utils/GeneralUtils.h"
+#include "Exception.h"
namespace org {
namespace apache {
@@ -65,13 +66,53 @@
db_.reset();
}
+DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : ContentSession(std::move(repository)) {}
+
+std::shared_ptr<ContentSession> DatabaseContentRepository::createSession() {
+ return std::make_shared<Session>(sharedFromThis());
+}
+
+void DatabaseContentRepository::Session::commit() {
+ auto dbContentRepository = std::static_pointer_cast<DatabaseContentRepository>(repository_);
+ auto opendb = dbContentRepository->db_->open();
+ if (!opendb) {
+ throw Exception(REPOSITORY_EXCEPTION, "Couldn't open rocksdb database to commit content changes");
+ }
+ rocksdb::WriteBatch batch;
+ for (const auto& resource : managedResources_) {
+ auto outStream = dbContentRepository->write(*resource.first, false, &batch);
+ if (outStream == nullptr) {
+ throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
+ }
+ const auto size = resource.second->getSize();
+ if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+ throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
+ }
+ }
+ for (const auto& resource : extendedResources_) {
+ auto outStream = dbContentRepository->write(*resource.first, true, &batch);
+ if (outStream == nullptr) {
+ throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
+ }
+ const auto size = resource.second->getSize();
+ if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+ throw Exception(REPOSITORY_EXCEPTION, "Failed to append to resource: " + resource.first->getContentFullPath());
+ }
+ }
+
+ rocksdb::WriteOptions options;
+ options.sync = true;
+ rocksdb::Status status = opendb->Write(options, &batch);
+ if (!status.ok()) {
+ throw Exception(REPOSITORY_EXCEPTION, "Batch write failed: " + status.ToString());
+ }
+
+ managedResources_.clear();
+ extendedResources_.clear();
+}
+
std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim &claim, bool append) {
- // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
- // we can simply return a nullptr, which is also valid from the API when this stream is not valid.
- if (!is_valid_ || !db_)
- return nullptr;
- // append is already supported in all modes
- return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true);
+ return write(claim, append, nullptr);
}
std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const minifi::ResourceClaim &claim) {
@@ -109,7 +150,7 @@
rocksdb::Status status;
status = opendb->Delete(rocksdb::WriteOptions(), claim.getContentFullPath());
if (status.ok()) {
- logger_->log_debug("Deleted %s", claim.getContentFullPath());
+ logger_->log_debug("Deleting resource %s", claim.getContentFullPath());
return true;
} else {
logger_->log_debug("Attempted, but could not delete %s", claim.getContentFullPath());
@@ -117,6 +158,15 @@
}
}
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim& claim, bool append, rocksdb::WriteBatch* batch) {
+ // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
+ // we can simply return a nullptr, which is also valid from the API when this stream is not valid.
+ if (!is_valid_ || !db_)
+ return nullptr;
+ // append is already supported in all modes
+ return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch);
+}
+
} /* namespace repository */
} /* namespace core */
} /* namespace minifi */
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 6abc476..a0bd595 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -26,6 +26,7 @@
#include "properties/Configure.h"
#include "core/logging/LoggerConfiguration.h"
#include "RocksDatabase.h"
+#include "core/ContentSession.h"
namespace org {
namespace apache {
@@ -70,6 +71,12 @@
* DatabaseContentRepository is a content repository that stores data onto the local file system.
*/
class DatabaseContentRepository : public core::ContentRepository, public core::Connectable {
+ class Session : public ContentSession {
+ public:
+ explicit Session(std::shared_ptr<ContentRepository> repository);
+
+ void commit() override;
+ };
public:
DatabaseContentRepository(std::string name = getClassName<DatabaseContentRepository>(), utils::Identifier uuid = utils::Identifier())
@@ -78,34 +85,36 @@
db_(nullptr),
logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()) {
}
- virtual ~DatabaseContentRepository() {
+ ~DatabaseContentRepository() override {
stop();
}
- virtual bool initialize(const std::shared_ptr<minifi::Configure> &configuration);
+ std::shared_ptr<ContentSession> createSession() override;
- virtual void stop();
+ bool initialize(const std::shared_ptr<minifi::Configure> &configuration) override;
- virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false);
+ void stop() override;
- virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim);
+ std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false) override;
- virtual bool close(const minifi::ResourceClaim &claim) {
+ std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override;
+
+ bool close(const minifi::ResourceClaim &claim) override {
return remove(claim);
}
- virtual bool remove(const minifi::ResourceClaim &claim);
+ bool remove(const minifi::ResourceClaim &claim) override;
- virtual bool exists(const minifi::ResourceClaim &streamId);
+ bool exists(const minifi::ResourceClaim &streamId) override;
- virtual void yield() {
+ void yield() override {
}
/**
* Determines if we are connected and operating
*/
- virtual bool isRunning() {
+ bool isRunning() override {
return true;
}
@@ -113,11 +122,13 @@
* Determines if work is available by this connectable
* @return boolean if work is available.
*/
- virtual bool isWorkAvailable() {
+ bool isWorkAvailable() override {
return true;
}
private:
+ std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, rocksdb::WriteBatch* batch);
+
bool is_valid_;
std::unique_ptr<minifi::internal::RocksDatabase> db_;
std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index decb030..8fe6178 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -30,11 +30,12 @@
namespace minifi {
namespace io {
-RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable)
+RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable, rocksdb::WriteBatch* batch)
: BaseStream(),
path_(std::move(path)),
write_enable_(write_enable),
- db_(std::move(db)),
+ db_(db),
+ batch_(batch),
logger_(logging::LoggerFactory<RocksDbStream>::getLogger()) {
auto opendb = db_->open();
exists_ = opendb && opendb->Get(rocksdb::ReadOptions(), path_, &value_).ok();
@@ -71,11 +72,15 @@
rocksdb::Slice slice_value((const char *) value, size);
rocksdb::Status status;
size_ += size;
- rocksdb::WriteOptions opts;
- opts.sync = true;
- opendb->Merge(opts, path_, slice_value);
+ if (batch_ != nullptr) {
+ status = batch_->Merge(path_, slice_value);
+ } else {
+ rocksdb::WriteOptions opts;
+ opts.sync = true;
+ status = opendb->Merge(opts, path_, slice_value);
+ }
if (status.ok()) {
- return 0;
+ return size;
} else {
return -1;
}
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index c62d27b..a762753 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -46,7 +46,7 @@
* File Stream constructor that accepts an fstream shared pointer.
* It must already be initialized for read and write.
*/
- explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false);
+ explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false, rocksdb::WriteBatch* batch = nullptr);
~RocksDbStream() override {
closeStream();
@@ -162,6 +162,8 @@
gsl::not_null<minifi::internal::RocksDatabase*> db_;
+ rocksdb::WriteBatch* batch_;
+
size_t size_;
private:
diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h
index a2b0f89..ef85076 100644
--- a/libminifi/include/Exception.h
+++ b/libminifi/include/Exception.h
@@ -44,11 +44,12 @@
SITE2SITE_EXCEPTION,
GENERAL_EXCEPTION,
REGEX_EXCEPTION,
+ REPOSITORY_EXCEPTION,
MAX_EXCEPTION
};
static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation", "Flow File Operation", "Processor Operation", "Process Session Operation", "Process Schedule Operation", "Site2Site Protocol",
- "General Operation", "Regex Operation" };
+ "General Operation", "Regex Operation", "Repository Operation" };
inline const char *ExceptionTypeToString(ExceptionType type) {
if (type < MAX_EXCEPTION)
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 2b9bbae..68c44fd 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -28,6 +28,8 @@
#include "io/BaseStream.h"
#include "StreamManager.h"
#include "core/Connectable.h"
+#include "ContentSession.h"
+#include "utils/GeneralUtils.h"
namespace org {
namespace apache {
@@ -38,7 +40,7 @@
/**
* Content repository definition that extends StreamManager.
*/
-class ContentRepository : public StreamManager<minifi::ResourceClaim> {
+class ContentRepository : public StreamManager<minifi::ResourceClaim>, public utils::EnableSharedFromThis<ContentRepository> {
public:
virtual ~ContentRepository() = default;
@@ -47,54 +49,22 @@
*/
virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
- virtual std::string getStoragePath() const {
- return directory_;
- }
+ virtual std::string getStoragePath() const;
+
+ virtual std::shared_ptr<ContentSession> createSession();
/**
* Stops this repository.
*/
virtual void stop() = 0;
- void reset() {
- std::lock_guard<std::mutex> lock(count_map_mutex_);
- count_map_.clear();
- }
+ void reset();
- virtual uint32_t getStreamCount(const minifi::ResourceClaim &streamId) {
- std::lock_guard<std::mutex> lock(count_map_mutex_);
- auto cnt = count_map_.find(streamId.getContentFullPath());
- if (cnt != count_map_.end()) {
- return cnt->second;
- } else {
- return 0;
- }
- }
+ virtual uint32_t getStreamCount(const minifi::ResourceClaim &streamId);
- virtual void incrementStreamCount(const minifi::ResourceClaim &streamId) {
- std::lock_guard<std::mutex> lock(count_map_mutex_);
- const std::string str = streamId.getContentFullPath();
- auto count = count_map_.find(str);
- if (count != count_map_.end()) {
- count_map_[str] = count->second + 1;
- } else {
- count_map_[str] = 1;
- }
- }
+ virtual void incrementStreamCount(const minifi::ResourceClaim &streamId);
- virtual StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) {
- std::lock_guard<std::mutex> lock(count_map_mutex_);
- const std::string str = streamId.getContentFullPath();
- auto count = count_map_.find(str);
- if (count != count_map_.end() && count->second > 1) {
- count_map_[str] = count->second - 1;
- return StreamState::Alive;
- } else {
- count_map_.erase(str);
- remove(streamId);
- return StreamState::Deleted;
- }
- }
+ virtual StreamState decrementStreamCount(const minifi::ResourceClaim &streamId);
protected:
std::string directory_;
diff --git a/libminifi/include/core/ContentSession.h b/libminifi/include/core/ContentSession.h
new file mode 100644
index 0000000..a7144f3
--- /dev/null
+++ b/libminifi/include/core/ContentSession.h
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include "ResourceClaim.h"
+#include "io/BaseStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+class ContentRepository;
+
+class ContentSession {
+ public:
+ enum class WriteMode {
+ OVERWRITE,
+ APPEND
+ };
+
+ explicit ContentSession(std::shared_ptr<ContentRepository> repository);
+
+ std::shared_ptr<ResourceClaim> create();
+
+ std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode = WriteMode::OVERWRITE);
+
+ std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resourceId);
+
+ virtual void commit();
+
+ void rollback();
+
+ virtual ~ContentSession() = default;
+
+ protected:
+ std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BaseStream>> managedResources_;
+ std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BaseStream>> extendedResources_;
+ std::shared_ptr<ContentRepository> repository_;
+};
+
+} // namespace core
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
+
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index e6e4f23..7d0f535 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -57,6 +57,7 @@
logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode()->getName());
auto repo = process_context_->getProvenanceRepository();
provenance_report_ = std::make_shared<provenance::ProvenanceReporter>(repo, process_context_->getProcessorNode()->getName(), process_context_->getProcessorNode()->getName());
+ content_session_ = process_context_->getContentRepository()->createSession();
}
// Destructor
@@ -70,6 +71,9 @@
std::shared_ptr<provenance::ProvenanceReporter> getProvenanceReporter() {
return provenance_report_;
}
+ // writes the created contents to the underlying repository
+ void flushContent();
+
// Get the FlowFile from the highest priority queue
virtual std::shared_ptr<core::FlowFile> get();
// Create a new UUID FlowFile with no content resource claim and without parent
@@ -165,6 +169,8 @@
// Provenance Report
std::shared_ptr<provenance::ProvenanceReporter> provenance_report_;
+ std::shared_ptr<ContentSession> content_session_;
+
static std::shared_ptr<utils::IdGenerator> id_generator_;
};
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 03ece73..445f482 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -30,6 +30,8 @@
#include "properties/Configure.h"
#include "core/Connectable.h"
#include "core/logging/LoggerConfiguration.h"
+#include "utils/GeneralUtils.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -41,7 +43,12 @@
* Purpose: Stages content into a volatile area of memory. Note that when the maximum number
* of entries is consumed we will rollback a session to wait for others to be freed.
*/
-class VolatileContentRepository : public core::ContentRepository, public virtual core::repository::VolatileRepository<ResourceClaim::Path> {
+class VolatileContentRepository :
+ public core::ContentRepository,
+ public core::repository::VolatileRepository<ResourceClaim::Path>,
+ public utils::EnableSharedFromThis<VolatileContentRepository> {
+ using utils::EnableSharedFromThis<VolatileContentRepository>::sharedFromThis;
+
public:
static const char *minimal_locking;
@@ -109,11 +116,6 @@
virtual void run();
- template<typename T2>
- std::shared_ptr<T2> shared_from_parent() {
- return std::dynamic_pointer_cast<T2>(shared_from_this());
- }
-
private:
bool minimize_locking_;
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index 1c65f0e..f826fee 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -35,7 +35,9 @@
* Volatile flow file repository. keeps a running counter of the current location, freeing
* those which we no longer hold.
*/
-class VolatileFlowFileRepository : public VolatileRepository<std::string> {
+class VolatileFlowFileRepository : public VolatileRepository<std::string>, public utils::EnableSharedFromThis<VolatileFlowFileRepository> {
+ using utils::EnableSharedFromThis<VolatileFlowFileRepository>::sharedFromThis;
+
public:
explicit VolatileFlowFileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
MAX_REPOSITORY_STORAGE_SIZE,
@@ -59,7 +61,7 @@
if (purge_required_ && nullptr != content_repo_) {
std::lock_guard<std::mutex> lock(purge_mutex_);
for (auto purgeItem : purge_list_) {
- std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(sharedFromThis(), content_repo_);
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(purgeItem.data()), purgeItem.size())) {
auto claim = eventRead->getResourceClaim();
if (claim) claim->decreaseFlowFileRecordOwnedCount();
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index 31c0f87..a5b395a 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -51,7 +51,9 @@
* Design: Extends Repository and implements the run function, using RocksDB as the primary substrate.
*/
template<typename T>
-class VolatileRepository : public core::Repository, public std::enable_shared_from_this<VolatileRepository<T>> {
+class VolatileRepository : public core::Repository, public utils::EnableSharedFromThis<VolatileRepository<T>> {
+ using utils::EnableSharedFromThis<VolatileRepository<T>>::sharedFromThis;
+
public:
static const char *volatile_repo_max_count;
static const char *volatile_repo_max_bytes;
@@ -404,7 +406,7 @@
if (running_)
return;
running_ = true;
- thread_ = std::thread(&VolatileRepository<T>::run, std::enable_shared_from_this<VolatileRepository<T>>::shared_from_this());
+ thread_ = std::thread(&VolatileRepository<T>::run, sharedFromThis());
logger_->log_debug("%s Repository Monitor Thread Start", name_);
}
#if defined(__clang__)
diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h
index 1884a62..c772cd3 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -141,6 +141,9 @@
// data stream overrides
template<typename T>
int AtomicEntryStream<T>::writeData(uint8_t *value, int size) {
+ if (size == 0) {
+ return 0;
+ }
if (nullptr != value && !invalid_stream_) {
std::lock_guard<std::recursive_mutex> lock(entry_lock_);
if (entry_->insert(key_, value, size)) {
@@ -176,6 +179,9 @@
template<typename T>
int AtomicEntryStream<T>::readData(uint8_t *buf, int buflen) {
+ if (buflen == 0) {
+ return 0;
+ }
if (nullptr != buf && !invalid_stream_) {
std::lock_guard<std::recursive_mutex> lock(entry_lock_);
int len = buflen;
diff --git a/libminifi/include/utils/GeneralUtils.h b/libminifi/include/utils/GeneralUtils.h
index 2d45724..ee91627 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -68,6 +68,30 @@
using std::void_t;
#endif /* < C++17 */
+namespace internal {
+
+/*
+ * We need this base class to enable safe multiple inheritance
+ * from std::enable_shared_from_this, it also needs to be polymorphic
+ * to allow dynamic_cast to the derived class.
+ */
+struct EnableSharedFromThisBase : std::enable_shared_from_this<EnableSharedFromThisBase> {
+ virtual ~EnableSharedFromThisBase() = default;
+};
+
+} // namespace internal
+
+/*
+ * The virtual inheritance ensures that there is only a single
+ * std::weak_ptr instance in each instance.
+ */
+template<typename T>
+struct EnableSharedFromThis : virtual internal::EnableSharedFromThisBase {
+ std::shared_ptr<T> sharedFromThis() {
+ return std::dynamic_pointer_cast<T>(internal::EnableSharedFromThisBase::shared_from_this());
+ }
+};
+
} // namespace utils
} // namespace minifi
} // namespace nifi
diff --git a/libminifi/src/core/ContentRepository.cpp b/libminifi/src/core/ContentRepository.cpp
new file mode 100644
index 0000000..c4d06ed
--- /dev/null
+++ b/libminifi/src/core/ContentRepository.cpp
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "core/ContentRepository.h"
+#include "core/ContentSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+std::string ContentRepository::getStoragePath() const {
+ return directory_;
+}
+
+void ContentRepository::reset() {
+ std::lock_guard<std::mutex> lock(count_map_mutex_);
+ count_map_.clear();
+}
+
+std::shared_ptr<ContentSession> ContentRepository::createSession() {
+ return std::make_shared<ContentSession>(sharedFromThis());
+}
+
+uint32_t ContentRepository::getStreamCount(const minifi::ResourceClaim &streamId) {
+ std::lock_guard<std::mutex> lock(count_map_mutex_);
+ auto cnt = count_map_.find(streamId.getContentFullPath());
+ if (cnt != count_map_.end()) {
+ return cnt->second;
+ } else {
+ return 0;
+ }
+}
+
+void ContentRepository::incrementStreamCount(const minifi::ResourceClaim &streamId) {
+ std::lock_guard<std::mutex> lock(count_map_mutex_);
+ const std::string str = streamId.getContentFullPath();
+ auto count = count_map_.find(str);
+ if (count != count_map_.end()) {
+ count_map_[str] = count->second + 1;
+ } else {
+ count_map_[str] = 1;
+ }
+}
+
+ContentRepository::StreamState ContentRepository::decrementStreamCount(const minifi::ResourceClaim &streamId) {
+ std::lock_guard<std::mutex> lock(count_map_mutex_);
+ const std::string str = streamId.getContentFullPath();
+ auto count = count_map_.find(str);
+ if (count != count_map_.end() && count->second > 1) {
+ count_map_[str] = count->second - 1;
+ return StreamState::Alive;
+ } else {
+ count_map_.erase(str);
+ remove(streamId);
+ return StreamState::Deleted;
+ }
+}
+
+} // namespace core
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/src/core/ContentSession.cpp b/libminifi/src/core/ContentSession.cpp
new file mode 100644
index 0000000..f5934f9
--- /dev/null
+++ b/libminifi/src/core/ContentSession.cpp
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include "core/ContentRepository.h"
+#include "core/ContentSession.h"
+#include "ResourceClaim.h"
+#include "io/BaseStream.h"
+#include "Exception.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+ContentSession::ContentSession(std::shared_ptr<ContentRepository> repository) : repository_(std::move(repository)) {}
+
+std::shared_ptr<ResourceClaim> ContentSession::create() {
+ std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(repository_);
+ managedResources_[claim] = std::make_shared<io::BaseStream>();
+ return claim;
+}
+
+std::shared_ptr<io::BaseStream> ContentSession::write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode) {
+ auto it = managedResources_.find(resourceId);
+ if (it == managedResources_.end()) {
+ if (mode == WriteMode::OVERWRITE) {
+ throw Exception(REPOSITORY_EXCEPTION, "Can only overwrite owned resource");
+ }
+ auto& extension = extendedResources_[resourceId];
+ if (!extension) {
+ extension = std::make_shared<io::BaseStream>();
+ }
+ return extension;
+ }
+ if (mode == WriteMode::OVERWRITE) {
+ it->second = std::make_shared<io::BaseStream>();
+ }
+ return it->second;
+}
+
+std::shared_ptr<io::BaseStream> ContentSession::read(const std::shared_ptr<ResourceClaim>& resourceId) {
+ // TODO(adebreceni):
+ // after the stream refactor is merged we should be able to share the underlying buffer
+ // between multiple InputStreams, moreover create a ConcatInputStream
+ if (managedResources_.find(resourceId) != managedResources_.end() || extendedResources_.find(resourceId) != extendedResources_.end()) {
+ throw Exception(REPOSITORY_EXCEPTION, "Can only read non-modified resource");
+ }
+ return repository_->read(*resourceId);
+}
+
+void ContentSession::commit() {
+ for (const auto& resource : managedResources_) {
+ auto outStream = repository_->write(*resource.first);
+ if (outStream == nullptr) {
+ throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
+ }
+ const auto size = resource.second->getSize();
+ if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+ throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
+ }
+ }
+ for (const auto& resource : extendedResources_) {
+ auto outStream = repository_->write(*resource.first, true);
+ if (outStream == nullptr) {
+ throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
+ }
+ const auto size = resource.second->getSize();
+ if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+ throw Exception(REPOSITORY_EXCEPTION, "Failed to append to resource: " + resource.first->getContentFullPath());
+ }
+ }
+
+ managedResources_.clear();
+ extendedResources_.clear();
+}
+
+void ContentSession::rollback() {
+ managedResources_.clear();
+ extendedResources_.clear();
+}
+
+} // namespace core
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
+
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index b0481a5..20cc9ea 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -241,11 +241,11 @@
}
void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+ std::shared_ptr<ResourceClaim> claim = content_session_->create();
try {
uint64_t startTime = utils::timeutils::getTimeMillis();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim);
+ std::shared_ptr<io::BaseStream> stream = content_session_->write(claim);
// Call the callback to write the content
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
@@ -280,7 +280,7 @@
try {
uint64_t startTime = utils::timeutils::getTimeMillis();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim, true);
+ std::shared_ptr<io::BaseStream> stream = content_session_->write(claim, ContentSession::WriteMode::APPEND);
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
}
@@ -323,7 +323,7 @@
claim = flow->getResourceClaim();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(*claim);
+ std::shared_ptr<io::BaseStream> stream = content_session_->read(claim);
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
@@ -350,13 +350,13 @@
*
*/
void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<core::FlowFile> &flow) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+ std::shared_ptr<ResourceClaim> claim = content_session_->create();
size_t max_read = getpagesize();
std::vector<uint8_t> charBuffer(max_read);
try {
auto startTime = utils::timeutils::getTimeMillis();
- std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(*claim);
+ std::shared_ptr<io::BaseStream> content_stream = content_session_->write(claim);
if (nullptr == content_stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
@@ -394,7 +394,7 @@
}
void ProcessSession::import(std::string source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource, uint64_t offset) {
- std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+ std::shared_ptr<ResourceClaim> claim = content_session_->create();
size_t size = getpagesize();
std::vector<uint8_t> charBuffer(size);
@@ -402,7 +402,8 @@
auto startTime = utils::timeutils::getTimeMillis();
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim);
+ std::shared_ptr<io::BaseStream> stream = content_session_->write(claim);
+
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open new flowfile content for write");
}
@@ -516,10 +517,10 @@
/* Create claim and stream if needed and append data */
if (claim == nullptr) {
startTime = utils::timeutils::getTimeMillis();
- claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+ claim = content_session_->create();
}
if (stream == nullptr) {
- stream = process_context_->getContentRepository()->write(*claim);
+ stream = content_session_->write(claim);
}
if (stream == nullptr) {
logger_->log_error("Stream is null");
@@ -799,6 +800,8 @@
}
}
+ content_session_->commit();
+
persistFlowFilesBeforeTransfer(connectionQueues, _flowFileSnapShots);
for (auto& cq : connectionQueues) {
@@ -869,6 +872,8 @@
}
}
+ content_session_->rollback();
+
_flowFileSnapShots.clear();
_clonedFlowFiles.clear();
@@ -997,6 +1002,10 @@
return nullptr;
}
+void ProcessSession::flushContent() {
+ content_session_->commit();
+}
+
bool ProcessSession::outgoingConnectionsFull(const std::string& relationship) {
std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship);
Connection * connection = nullptr;
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index d7039cf..b015f59 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -72,7 +72,7 @@
return;
if (running_)
return;
- thread_ = std::thread(&VolatileContentRepository::run, shared_from_parent<VolatileContentRepository>());
+ thread_ = std::thread(&VolatileContentRepository::run, sharedFromThis());
thread_.detach();
running_ = true;
logger_->log_info("%s Repository Monitor Thread Start", getName());
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index e3bdf08..8912f7b 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -33,7 +33,9 @@
int DataStream::writeData(uint8_t *value, int size) {
if (value == nullptr)
return 0;
- std::copy(value, value + size, std::back_inserter(buffer));
+ std::size_t previous_size = buffer.size();
+ buffer.resize(previous_size + size);
+ std::memcpy(buffer.data() + previous_size, value, size);
return size;
}
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index fe6e036..13d1707 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -119,32 +119,31 @@
std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
- processor_ = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
- processor_->initialize();
+ processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+ processor->initialize();
utils::Identifier processoruuid;
- REQUIRE(true == processor_->getUUID(processoruuid));
+ REQUIRE(true == processor->getUUID(processoruuid));
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// connection from compress processor to log attribute
- output_ = std::make_shared<minifi::Connection>(repo, content_repo, "Output");
- output_->addRelationship(core::Relationship("success", "compress successful output"));
- output_->setSource(processor_);
- output_->setSourceUUID(processoruuid);
- processor_->addConnection(output_);
+ output = std::make_shared<minifi::Connection>(repo, content_repo, "Output");
+ output->addRelationship(core::Relationship("success", "compress successful output"));
+ output->setSource(processor);
+ output->setSourceUUID(processoruuid);
+ processor->addConnection(output);
// connection to compress processor
- input_ = std::make_shared<minifi::Connection>(repo, content_repo, "Input");
- input_->setDestination(processor_);
- input_->setDestinationUUID(processoruuid);
- processor_->addConnection(input_);
+ input = std::make_shared<minifi::Connection>(repo, content_repo, "Input");
+ input->setDestination(processor);
+ input->setDestinationUUID(processoruuid);
+ processor->addConnection(input);
- processor_->setAutoTerminatedRelationships({{"failure", ""}});
+ processor->setAutoTerminatedRelationships({{"failure", ""}});
- processor_->incrementActiveTasks();
- processor_->setScheduledState(core::ScheduledState::RUNNING);
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
- std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor_);
- context_ = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ context = std::make_shared<core::ProcessContext>(std::make_shared<core::ProcessorNode>(processor), nullptr, repo, repo, content_repo);
}
public:
@@ -178,13 +177,15 @@
virtual ~CompressDecompressionTestController() = 0;
- std::shared_ptr<core::Processor> processor_;
- std::shared_ptr<core::ProcessContext> context_;
- std::shared_ptr<minifi::Connection> output_;
- std::shared_ptr<minifi::Connection> input_;
+ std::shared_ptr<core::Processor> processor;
+ std::shared_ptr<core::ProcessContext> context;
+ std::shared_ptr<minifi::Connection> output;
+ std::shared_ptr<minifi::Connection> input;
};
-CompressDecompressionTestController::~CompressDecompressionTestController() = default;
+CompressDecompressionTestController::~CompressDecompressionTestController() {
+ LogTestController::getInstance().reset();
+}
std::string CompressDecompressionTestController::tempDir_;
std::string CompressDecompressionTestController::raw_content_path_;
@@ -233,13 +234,7 @@
}
};
-TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
- CompressTestController testController;
- auto context = testController.context_;
- auto input = testController.input_;
- auto processor = testController.processor_;
- auto output = testController.output_;
-
+TEST_CASE_METHOD(CompressTestController, "CompressFileGZip", "[compressfiletest1]") {
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -247,7 +242,8 @@
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+ sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
+ sessionGenFlowFile.flushContent();
input->put(flow);
REQUIRE(processor->getName() == "compresscontent");
@@ -270,20 +266,13 @@
sessionGenFlowFile.read(flow1, &callback);
callback.archive_read();
std::string content(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
- REQUIRE(testController.getRawContent() == content);
+ REQUIRE(getRawContent() == content);
// write the compress content for next test
- testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("DecompressFileGZip", "[compressfiletest2]") {
- DecompressTestController testController;
- auto context = testController.context_;
- auto input = testController.input_;
- auto processor = testController.processor_;
- auto output = testController.output_;
-
+TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfiletest2]") {
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -291,7 +280,8 @@
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+ sessionGenFlowFile.import(compressedPath(), flow, true, 0);
+ sessionGenFlowFile.flushContent();
input->put(flow);
REQUIRE(processor->getName() == "compresscontent");
@@ -312,18 +302,11 @@
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
std::string content(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- REQUIRE(testController.getRawContent() == content);
+ REQUIRE(getRawContent() == content);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("CompressFileBZip", "[compressfiletest3]") {
- CompressTestController testController;
- auto context = testController.context_;
- auto input = testController.input_;
- auto processor = testController.processor_;
- auto output = testController.output_;
-
+TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3]") {
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -331,7 +314,8 @@
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+ sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
+ sessionGenFlowFile.flushContent();
input->put(flow);
REQUIRE(processor->getName() == "compresscontent");
@@ -354,21 +338,14 @@
sessionGenFlowFile.read(flow1, &callback);
callback.archive_read();
std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
- REQUIRE(testController.getRawContent() == contents);
+ REQUIRE(getRawContent() == contents);
// write the compress content for next test
- testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("DecompressFileBZip", "[compressfiletest4]") {
- DecompressTestController testController;
- auto context = testController.context_;
- auto input = testController.input_;
- auto processor = testController.processor_;
- auto output = testController.output_;
-
+TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfiletest4]") {
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -376,7 +353,8 @@
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+ sessionGenFlowFile.import(compressedPath(), flow, true, 0);
+ sessionGenFlowFile.flushContent();
input->put(flow);
REQUIRE(processor->getName() == "compresscontent");
@@ -397,18 +375,11 @@
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- REQUIRE(testController.getRawContent() == contents);
+ REQUIRE(getRawContent() == contents);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("CompressFileLZMA", "[compressfiletest5]") {
- CompressTestController testController;
- auto context = testController.context_;
- auto input = testController.input_;
- auto processor = testController.processor_;
- auto output = testController.output_;
-
+TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5]") {
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -416,7 +387,8 @@
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+ sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
+ sessionGenFlowFile.flushContent();
input->put(flow);
REQUIRE(processor->getName() == "compresscontent");
@@ -445,21 +417,14 @@
sessionGenFlowFile.read(flow1, &callback);
callback.archive_read();
std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
- REQUIRE(testController.getRawContent() == contents);
+ REQUIRE(getRawContent() == contents);
// write the compress content for next test
- testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("DecompressFileLZMA", "[compressfiletest6]") {
- DecompressTestController testController;
- auto context = testController.context_;
- auto input = testController.input_;
- auto processor = testController.processor_;
- auto output = testController.output_;
-
+TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfiletest6]") {
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -467,8 +432,9 @@
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+ sessionGenFlowFile.import(compressedPath(), flow, true, 0);
flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-lzma");
+ sessionGenFlowFile.flushContent();
input->put(flow);
REQUIRE(processor->getName() == "compresscontent");
@@ -495,18 +461,11 @@
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- REQUIRE(testController.getRawContent() == contents);
+ REQUIRE(getRawContent() == contents);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("CompressFileXYLZMA", "[compressfiletest7]") {
- CompressTestController testController;
- auto context = testController.context_;
- auto input = testController.input_;
- auto processor = testController.processor_;
- auto output = testController.output_;
-
+TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletest7]") {
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -514,7 +473,8 @@
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+ sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
+ sessionGenFlowFile.flushContent();
input->put(flow);
REQUIRE(processor->getName() == "compresscontent");
@@ -543,21 +503,14 @@
sessionGenFlowFile.read(flow1, &callback);
callback.archive_read();
std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
- REQUIRE(testController.getRawContent() == contents);
+ REQUIRE(getRawContent() == contents);
// write the compress content for next test
- testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+ writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("DecompressFileXYLZMA", "[compressfiletest8]") {
- DecompressTestController testController;
- auto context = testController.context_;
- auto input = testController.input_;
- auto processor = testController.processor_;
- auto output = testController.output_;
-
+TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfiletest8]") {
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -565,8 +518,9 @@
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+ sessionGenFlowFile.import(compressedPath(), flow, true, 0);
flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-xz");
+ sessionGenFlowFile.flushContent();
input->put(flow);
REQUIRE(processor->getName() == "compresscontent");
@@ -593,23 +547,21 @@
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
sessionGenFlowFile.read(flow1, &callback);
std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
- REQUIRE(testController.getRawContent() == contents);
+ REQUIRE(getRawContent() == contents);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("RawGzipCompressionDecompression", "[compressfiletest8]") {
- TestController testController;
+TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfiletest8]") {
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>();
// Create temporary directories
char format_src[] = "/tmp/archives.XXXXXX";
- std::string src_dir = testController.createTempDirectory(format_src);
+ std::string src_dir = createTempDirectory(format_src);
REQUIRE(!src_dir.empty());
char format_dst[] = "/tmp/archived.XXXXXX";
- std::string dst_dir = testController.createTempDirectory(format_dst);
+ std::string dst_dir = createTempDirectory(format_dst);
REQUIRE(!dst_dir.empty());
// Define files
@@ -618,7 +570,7 @@
std::string decompressed_file = utils::file::FileUtils::concat_path(dst_dir, "src.txt");
// Build MiNiFi processing graph
- auto plan = testController.createPlan();
+ auto plan = createPlan();
auto get_file = plan->addProcessor(
"GetFile",
"GetFile");
@@ -684,7 +636,7 @@
std::ofstream{ src_file } << content;
// Run flow
- testController.runSession(plan, true);
+ runSession(plan, true);
// Check compressed file
std::ifstream compressed(compressed_file, std::ios::in | std::ios::binary);
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 370987f..24572cd 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -136,31 +136,31 @@
public:
MergeTestController() {
init_file_paths();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<minifi::processors::MergeContent>();
+ LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
LogTestController::getInstance().setTrace<core::ProcessSession>();
LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
- LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+ LogTestController::getInstance().setTrace<minifi::processors::BinFiles>();
+ LogTestController::getInstance().setTrace<minifi::processors::Bin>();
+ LogTestController::getInstance().setTrace<minifi::processors::BinManager>();
+ LogTestController::getInstance().setTrace<minifi::Connection>();
+ LogTestController::getInstance().setTrace<minifi::core::Connectable>();
std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<minifi::Configure>());
- processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
- std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+ processor = std::make_shared<processors::MergeContent>("mergecontent");
processor->initialize();
utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
+ REQUIRE(processor->getUUID(processoruuid));
+ std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<minifi::processors::LogAttribute>("logattribute");
utils::Identifier logAttributeuuid;
- REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+ REQUIRE(logAttributeProcessor->getUUID(logAttributeuuid));
- auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
// output from merge processor to log attribute
output = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
- output->addRelationship(core::Relationship("merged", "Merge successful output"));
+ output->addRelationship(processors::MergeContent::Merge);
output->setSource(processor);
output->setDestination(logAttributeProcessor);
output->setSourceUUID(processoruuid);
@@ -172,36 +172,26 @@
input->setDestinationUUID(processoruuid);
processor->addConnection(input);
- std::set<core::Relationship> autoTerminatedRelationships;
- core::Relationship original("original", "");
- core::Relationship failure("failure", "");
- autoTerminatedRelationships.insert(original);
- autoTerminatedRelationships.insert(failure);
- processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+ processor->setAutoTerminatedRelationships({processors::MergeContent::Original, processors::MergeContent::Failure});
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
logAttributeProcessor->incrementActiveTasks();
logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
- node = std::make_shared<core::ProcessorNode>(processor);
- context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ context = std::make_shared<core::ProcessContext>(std::make_shared<core::ProcessorNode>(processor), nullptr, repo, repo, content_repo);
}
- ~MergeTestController() = default;
+ ~MergeTestController() {
+ LogTestController::getInstance().reset();
+ }
+
std::shared_ptr<core::ProcessContext> context;
- std::shared_ptr<core::ProcessorNode> node;
std::shared_ptr<core::Processor> processor;
std::shared_ptr<minifi::Connection> input;
std::shared_ptr<minifi::Connection> output;
};
-TEST_CASE("MergeFileDefragment", "[mergefiletest1]") {
- MergeTestController testController;
- auto context = testController.context;
- auto processor = testController.processor;
- auto input = testController.input;
- auto output = testController.output;
-
+TEST_CASE_METHOD(MergeTestController, "MergeFileDefragment", "[mergefiletest1]") {
{
std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -225,10 +215,8 @@
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> record[6];
-
// Generate 6 flowfiles, first threes merged to one, second thress merged to one
- for (int i = 0; i < 6; i++) {
+ for (const int i : {0, 2, 5, 4, 1, 3}) {
const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -242,14 +230,9 @@
else
flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
- record[i] = flow;
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
}
- input->put(record[0]);
- input->put(record[2]);
- input->put(record[5]);
- input->put(record[4]);
- input->put(record[1]);
- input->put(record[3]);
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
processor->onSchedule(context, factory);
@@ -278,10 +261,9 @@
std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
REQUIRE(callback.to_string() == contents);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDelimiter", "[mergefiletest2]") {
{
std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -315,12 +297,6 @@
expectfileSecond << "footer";
}
- MergeTestController testController;
- auto context = testController.context;
- auto processor = testController.processor;
- auto input = testController.input;
- auto output = testController.output;
-
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_FILENAME);
@@ -329,10 +305,8 @@
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, DEMARCATOR_FILE);
core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> record[6];
-
// Generate 6 flowfiles, first threes merged to one, second thress merged to one
- for (int i = 0; i < 6; i++) {
+ for (const int i : {0, 2, 5, 4, 1, 3}) {
const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -346,14 +320,9 @@
else
flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
- record[i] = flow;
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
}
- input->put(record[0]);
- input->put(record[2]);
- input->put(record[5]);
- input->put(record[4]);
- input->put(record[1]);
- input->put(record[3]);
REQUIRE(processor->getName() == "mergecontent");
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -383,10 +352,9 @@
std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
REQUIRE(callback.to_string() == contents);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDropFlow", "[mergefiletest3]") {
{
std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -407,24 +375,14 @@
}
}
- MergeTestController testController;
- auto context = testController.context;
- auto processor = testController.processor;
- auto input = testController.input;
- auto output = testController.output;
-
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> record[6];
-
- // Generate 6 flowfiles, first threes merged to one, second thress merged to one
- for (int i = 0; i < 6; i++) {
- if (i == 4)
- continue;
+ // Generate 5 flowfiles, first threes merged to one, the other two merged to one
+ for (const int i : {0, 2, 5, 1, 3}) {
const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -438,20 +396,14 @@
else
flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
- record[i] = flow;
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
}
- input->put(record[0]);
- input->put(record[2]);
- input->put(record[5]);
- input->put(record[1]);
- input->put(record[3]);
REQUIRE(processor->getName() == "mergecontent");
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
processor->onSchedule(context, factory);
- for (int i = 0; i < 6; i++) {
- if (i == 4)
- continue;
+ for (int i = 0; i < 5; i++) {
auto session = std::make_shared<core::ProcessSession>(context);
processor->onTrigger(context, session);
session->commit();
@@ -482,10 +434,9 @@
std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
REQUIRE(callback.to_string() == contents);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileBinPack", "[mergefiletest4]") {
{
std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -504,12 +455,6 @@
}
}
- MergeTestController testController;
- auto context = testController.context;
- auto processor = testController.processor;
- auto input = testController.input;
- auto output = testController.output;
-
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
@@ -517,22 +462,15 @@
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> record[6];
-
// Generate 6 flowfiles, first threes merged to one, second thress merged to one
- for (int i = 0; i < 6; i++) {
+ for (const int i : {0, 1, 2, 3, 4, 5}) {
const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
sessionGenFlowFile.import(flowFileName, flow, true, 0);
flow->setAttribute("tag", "tag");
- record[i] = flow;
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
}
- input->put(record[0]);
- input->put(record[1]);
- input->put(record[2]);
- input->put(record[3]);
- input->put(record[4]);
- input->put(record[5]);
REQUIRE(processor->getName() == "mergecontent");
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -562,11 +500,10 @@
std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
REQUIRE(callback.to_string() == contents);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("MergeFileTar", "[mergefiletest4]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
{
std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -585,12 +522,6 @@
}
}
- MergeTestController testController;
- auto context = testController.context;
- auto processor = testController.processor;
- auto input = testController.input;
- auto output = testController.output;
-
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
@@ -598,22 +529,15 @@
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> record[6];
-
// Generate 6 flowfiles, first threes merged to one, second thress merged to one
- for (int i = 0; i < 6; i++) {
+ for (const int i : {0, 1, 2, 3, 4, 5}) {
const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
sessionGenFlowFile.import(flowFileName, flow, true, 0);
flow->setAttribute("tag", "tag");
- record[i] = flow;
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
}
- input->put(record[0]);
- input->put(record[1]);
- input->put(record[2]);
- input->put(record[3]);
- input->put(record[4]);
- input->put(record[5]);
REQUIRE(processor->getName() == "mergecontent");
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -653,10 +577,9 @@
REQUIRE(archives[i-3].to_string() == contents);
}
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("MergeFileZip", "[mergefiletest5]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
{
std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -675,12 +598,6 @@
}
}
- MergeTestController testController;
- auto context = testController.context;
- auto processor = testController.processor;
- auto input = testController.input;
- auto output = testController.output;
-
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_ZIP_VALUE);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
@@ -688,22 +605,15 @@
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> record[6];
-
// Generate 6 flowfiles, first threes merged to one, second thress merged to one
- for (int i = 0; i < 6; i++) {
+ for (const int i : {0, 1, 2, 3, 4, 5}) {
const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
sessionGenFlowFile.import(flowFileName, flow, true, 0);
flow->setAttribute("tag", "tag");
- record[i] = flow;
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
}
- input->put(record[0]);
- input->put(record[1]);
- input->put(record[2]);
- input->put(record[3]);
- input->put(record[4]);
- input->put(record[5]);
REQUIRE(processor->getName() == "mergecontent");
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -743,10 +653,9 @@
REQUIRE(archives[i-3].to_string() == contents);
}
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("MergeFileOnAttribute", "[mergefiletest5]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileOnAttribute", "[mergefiletest5]") {
{
std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -761,12 +670,6 @@
}
}
- MergeTestController testController;
- auto context = testController.context;
- auto processor = testController.processor;
- auto input = testController.input;
- auto output = testController.output;
-
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
@@ -774,10 +677,8 @@
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> record[6];
-
// Generate 6 flowfiles, even files are merged to one, odd files are merged to an other
- for (int i = 0; i < 6; i++) {
+ for (const int i : {0, 1, 2, 3, 4, 5}) {
const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -785,14 +686,9 @@
flow->setAttribute("tag", "even");
else
flow->setAttribute("tag", "odd");
- record[i] = flow;
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
}
- input->put(record[0]);
- input->put(record[1]);
- input->put(record[2]);
- input->put(record[3]);
- input->put(record[4]);
- input->put(record[5]);
REQUIRE(processor->getName() == "mergecontent");
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -820,16 +716,9 @@
std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
REQUIRE(callback.to_string() == contents);
}
- LogTestController::getInstance().reset();
}
-TEST_CASE("Test Merge File Attributes Keeping Only Common Attributes", "[testMergeFileKeepOnlyCommonAttributes]") {
- MergeTestController testController;
- auto context = testController.context;
- auto processor = testController.processor;
- auto input = testController.input;
- auto output = testController.output;
-
+TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping Only Common Attributes", "[testMergeFileKeepOnlyCommonAttributes]") {
{
std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
@@ -849,10 +738,9 @@
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> record[3];
// Generate 3 flowfiles merging all into one
- for (int i = 0; i < 3; i++) {
+ for (const int i : {1, 2, 0}) {
const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -871,11 +759,9 @@
}
flow->setAttribute("tagCommon", "common");
- record[i] = flow;
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
}
- input->put(record[1]);
- input->put(record[2]);
- input->put(record[0]);
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
processor->onSchedule(context, factory);
@@ -894,17 +780,9 @@
REQUIRE(attributes.find("tagUnique2") == attributes.end());
REQUIRE(attributes["tagCommon"] == "common");
REQUIRE(attributes["mime.type"] == "application/tar");
-
- LogTestController::getInstance().reset();
}
-TEST_CASE("Test Merge File Attributes Keeping All Unique Attributes", "[testMergeFileKeepAllUniqueAttributes]") {
- MergeTestController testController;
- auto context = testController.context;
- auto processor = testController.processor;
- auto input = testController.input;
- auto output = testController.output;
-
+TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping All Unique Attributes", "[testMergeFileKeepAllUniqueAttributes]") {
{
std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
@@ -925,10 +803,8 @@
context->setProperty(org::apache::nifi::minifi::processors::MergeContent::AttributeStrategy, org::apache::nifi::minifi::processors::merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE);
core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> record[3];
-
// Generate 3 flowfiles merging all into one
- for (int i = 0; i < 3; i++) {
+ for (const int i : {1, 2, 0}) {
const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -947,11 +823,9 @@
}
flow->setAttribute("tagCommon", "common");
- record[i] = flow;
+ sessionGenFlowFile.flushContent();
+ input->put(flow);
}
- input->put(record[1]);
- input->put(record[2]);
- input->put(record[0]);
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
processor->onSchedule(context, factory);
@@ -970,6 +844,4 @@
REQUIRE(attributes["tagUnique2"] == "unique2");
REQUIRE(attributes["tagCommon"] == "common");
REQUIRE(attributes["mime.type"] == "application/tar");
-
- LogTestController::getInstance().reset();
}
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 1ba607b..6d376af 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -34,6 +34,7 @@
#include "../../extensions/libarchive/MergeContent.h"
#include "../test/BufferReader.h"
#include "core/repository/VolatileFlowFileRepository.h"
+#include "../../extensions/rocksdb-repos/DatabaseContentRepository.h"
using Connection = minifi::Connection;
using MergeContent = minifi::processors::MergeContent;
@@ -250,6 +251,7 @@
LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
+ LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
char format[] = "/var/tmp/test.XXXXXX";
auto dir = testController.createTempDirectory(format);
@@ -269,6 +271,10 @@
testController.getLogger()->log_info("Using FileSystemRepository");
content_repo = std::make_shared<core::repository::FileSystemRepository>();
}
+ SECTION("DatabaseContentRepository") {
+ testController.getLogger()->log_info("Using DatabaseContentRepository");
+ content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ }
ff_repository->initialize(config);
content_repo->initialize(config);
@@ -298,6 +304,7 @@
// the processor added new content to the flowFile
REQUIRE(claim != newClaim);
// only this instance behind this shared_ptr keeps the resource alive
+ REQUIRE(claim.use_count() == 1);
REQUIRE(claim->getFlowFileRecordOwnedCount() == 1);
// one from the FlowFile and one from the persisted instance
REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
new file mode 100644
index 0000000..ecc987c
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+
+#include "core/Core.h"
+#include "FileSystemRepository.h"
+#include "VolatileContentRepository.h"
+#include "DatabaseContentRepository.h"
+#include "FlowFileRecord.h"
+#include "../TestBase.h"
+
+template<typename ContentRepositoryClass>
+class ContentSessionController : public TestController {
+ public:
+ ContentSessionController() {
+ char format[] = "/var/tmp/content_repo.XXXXXX";
+ std::string contentRepoPath = createTempDirectory(format);
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, contentRepoPath);
+ contentRepository = std::make_shared<ContentRepositoryClass>();
+ contentRepository->initialize(config);
+ }
+
+ ~ContentSessionController() {
+ log.reset();
+ }
+
+ std::shared_ptr<core::ContentRepository> contentRepository;
+};
+
+const std::shared_ptr<minifi::io::BaseStream>& operator<<(const std::shared_ptr<minifi::io::BaseStream>& stream, const std::string& str) {
+ REQUIRE(stream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data())), str.length()) == str.length());
+ return stream;
+}
+
+const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<minifi::io::BaseStream>& stream, std::string& str) {
+ str = "";
+ uint8_t buffer[4096]{};
+ while (true) {
+ size_t ret = stream->read(buffer, sizeof(buffer));
+ REQUIRE(ret >= 0);
+ if (ret == 0) {
+ break;
+ }
+ str += std::string{reinterpret_cast<char*>(buffer), ret};
+ }
+ return stream;
+}
+
+// TODO(adebreceni):
+// seems like the current version of Catch2 does not support templated tests
+// we should update instead of creating make-shift macros
+template<typename ContentRepositoryClass>
+void test_template() {
+ ContentSessionController<ContentRepositoryClass> controller;
+ std::shared_ptr<core::ContentRepository> contentRepository = controller.contentRepository;
+
+
+ std::shared_ptr<minifi::ResourceClaim> oldClaim;
+ {
+ auto session = contentRepository->createSession();
+ oldClaim = session->create();
+ session->write(oldClaim) << "data";
+ session->commit();
+ }
+
+ auto session = contentRepository->createSession();
+ REQUIRE_THROWS(session->write(oldClaim));
+
+ REQUIRE_NOTHROW(session->read(oldClaim));
+ session->write(oldClaim, core::ContentSession::WriteMode::APPEND) << "-addendum";
+ REQUIRE_THROWS(session->read(oldClaim)); // now throws because we appended to the content
+
+ auto claim1 = session->create();
+ session->write(claim1) << "hello content!";
+ REQUIRE_THROWS(session->read(claim1)); // TODO(adebreceni): we currently have no means to create joined streams
+
+ auto claim2 = session->create();
+ session->write(claim2, core::ContentSession::WriteMode::APPEND) << "beginning";
+ session->write(claim2, core::ContentSession::WriteMode::APPEND) << "-end";
+
+ auto claim3 = session->create();
+ session->write(claim3) << "first";
+ session->write(claim3, core::ContentSession::WriteMode::APPEND) << "-last";
+
+ auto claim4 = session->create();
+ session->write(claim4) << "beginning";
+ session->write(claim4) << "overwritten";
+
+ SECTION("Commit") {
+ session->commit();
+
+ std::string content;
+ contentRepository->read(*oldClaim) >> content;
+ REQUIRE(content == "data-addendum");
+
+ contentRepository->read(*claim1) >> content;
+ REQUIRE(content == "hello content!");
+
+ contentRepository->read(*claim2) >> content;
+ REQUIRE(content == "beginning-end");
+
+ contentRepository->read(*claim3) >> content;
+ REQUIRE(content == "first-last");
+
+ contentRepository->read(*claim4) >> content;
+ REQUIRE(content == "overwritten");
+ }
+
+ SECTION("Rollback") {
+ session->rollback();
+
+ std::string content;
+ contentRepository->read(*oldClaim) >> content;
+ REQUIRE(content == "data");
+
+ REQUIRE(!contentRepository->exists(*claim1));
+ REQUIRE(!contentRepository->exists(*claim2));
+ REQUIRE(!contentRepository->exists(*claim3));
+ REQUIRE(!contentRepository->exists(*claim4));
+ }
+}
+
+TEST_CASE("ContentSession behavior") {
+ SECTION("FileSystemRepository") {
+ test_template<core::repository::FileSystemRepository>();
+ }
+ SECTION("VolatileContentRepository") {
+ test_template<core::repository::VolatileContentRepository>();
+ }
+ SECTION("DatabaseContentRepository") {
+ test_template<core::repository::DatabaseContentRepository>();
+ }
+}
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
index e6a0211..f4c5eb0 100644
--- a/nanofi/src/cxx/Plan.cpp
+++ b/nanofi/src/cxx/Plan.cpp
@@ -171,6 +171,7 @@
flowFile->setAttribute(kv.first, kv.second);
}
current_session->importFrom(*(input_ff_params->content_stream.get()), flowFile);
+ current_session->flushContent();
current_session->transfer(flowFile, core::Relationship("success", "success"));
relationships_[relationships_.size()-1]->put(std::static_pointer_cast<core::FlowFile>(flowFile));
}