MINIFICPP-1395 - Use Identifier instead of its stringified form wherever possible
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #932
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.cpp b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
index 9361e5e..fbdbfdf 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.cpp
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
@@ -40,19 +40,19 @@
std::shared_ptr<utils::IdGenerator> HttpSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator();
-const std::string HttpSiteToSiteClient::parseTransactionId(const std::string &uri) {
+utils::optional<utils::Identifier> HttpSiteToSiteClient::parseTransactionId(const std::string &uri) {
int i = 0;
for (i = uri.length() - 1; i >= 0; i--) {
if (uri.at(i) == '/')
break;
}
- return uri.substr(i + 1, uri.length() - (i + 1));
+ return utils::Identifier::parse(uri.substr(i + 1, uri.length() - (i + 1)));
}
-std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(std::string &transactionID, TransferDirection direction) {
+std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(TransferDirection direction) {
std::string dir_str = direction == SEND ? "input-ports" : "output-ports";
std::stringstream uri;
- uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions";
+ uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId().to_string() << "/transactions";
auto client = create_http_client(uri.str(), "POST");
client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
client->setConnectionTimeout(std::chrono::milliseconds(5000));
@@ -76,9 +76,9 @@
auto transaction = std::make_shared<HttpTransaction>(direction, std::move(crcstream));
transaction->initialize(this, url);
auto transactionId = parseTransactionId(url);
- if (IsNullOrEmpty(transactionId))
+ if (!transactionId)
return nullptr;
- transaction->setTransactionId(transactionId);
+ transaction->setTransactionId(transactionId.value());
std::shared_ptr<minifi::utils::HTTPClient> client;
if (transaction->getDirection() == SEND) {
client = openConnectionForSending(transaction);
@@ -90,9 +90,8 @@
client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
peer_->setStream(std::unique_ptr<io::BaseStream>(new io::HttpStream(client)));
- transactionID = transaction->getUUIDStr();
- logger_->log_debug("Created transaction id -%s-", transactionID);
- known_transactions_[transaction->getUUIDStr()] = transaction;
+ logger_->log_debug("Created transaction id -%s-", transaction->getUUID().to_string());
+ known_transactions_[transaction->getUUID()] = transaction;
return transaction;
}
} else {
@@ -155,13 +154,13 @@
}
}
} else if (transaction->getState() == TRANSACTION_CONFIRMED) {
- closeTransaction(transaction->getUUIDStr());
+ closeTransaction(transaction->getUUID());
code = CONFIRM_TRANSACTION;
}
return 1;
} else if (transaction->getState() == TRANSACTION_CONFIRMED) {
- closeTransaction(transaction->getUUIDStr());
+ closeTransaction(transaction->getUUID());
code = TRANSACTION_FINISHED;
return 1;
@@ -234,10 +233,10 @@
}
-void HttpSiteToSiteClient::closeTransaction(const std::string &transactionID) {
+void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transactionID) {
std::shared_ptr<Transaction> transaction = NULL;
- std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+ auto it = this->known_transactions_.find(transactionID);
if (it == known_transactions_.end()) {
return;
@@ -266,13 +265,13 @@
} else {
std::string directon = transaction->getDirection() == RECEIVE ? "Receive" : "Send";
logger_->log_error("Transaction %s to be closed is in unexpected state. Direction: %s, tranfers: %d, bytes: %llu, state: %d",
- transactionID, directon, transaction->total_transfers_, transaction->_bytes, transaction->getState());
+ transactionID.to_string(), directon, transaction->total_transfers_, transaction->_bytes, transaction->getState());
}
std::stringstream uri;
std::string dir_str = transaction->getDirection() == SEND ? "input-ports" : "output-ports";
- uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions/" << transactionID << "?responseCode=" << code;
+ uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId().to_string() << "/transactions/" << transactionID.to_string() << "?responseCode=" << code;
if (code == CONFIRM_TRANSACTION && data_received) {
uri << "&checksum=" << transaction->getCRC();
@@ -304,7 +303,7 @@
transaction->current_transfers_--;
}
-void HttpSiteToSiteClient::deleteTransaction(std::string transactionID) {
+void HttpSiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) {
closeTransaction(transactionID);
SiteToSiteClient::deleteTransaction(transactionID);
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.h b/extensions/http-curl/sitetosite/HTTPProtocol.h
index 7b1e790..171ddd9 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.h
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.h
@@ -98,7 +98,7 @@
return true;
}
- virtual std::shared_ptr<Transaction> createTransaction(std::string &transactionID, TransferDirection direction) override;
+ virtual std::shared_ptr<Transaction> createTransaction(TransferDirection direction) override;
// Transfer flow files for the process session
// virtual bool transferFlowFiles(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
@@ -106,7 +106,7 @@
virtual bool transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload,
std::map<std::string, std::string> attributes) override;
// deleteTransaction
- virtual void deleteTransaction(std::string transactionID) override;
+ virtual void deleteTransaction(const utils::Identifier& transactionID) override;
protected:
@@ -114,7 +114,7 @@
* Closes the transaction
* @param transactionID transaction id reference.
*/
- void closeTransaction(const std::string &transactionID);
+ void closeTransaction(const utils::Identifier &transactionID);
virtual int readResponse(const std::shared_ptr<Transaction> &transaction, RespondCode &code, std::string &message) override;
// write respond
@@ -155,7 +155,7 @@
virtual void tearDown() override;
- const std::string parseTransactionId(const std::string &uri);
+ utils::optional<utils::Identifier> parseTransactionId(const std::string &uri);
std::unique_ptr<utils::HTTPClient> create_http_client(const std::string &uri, const std::string &method = "POST", bool setPropertyHeaders = false) {
std::unique_ptr<utils::HTTPClient> http_client_ = std::unique_ptr<utils::HTTPClient>(new minifi::utils::HTTPClient(uri, ssl_context_service_));
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 14cd7f0..2a4220f 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -57,11 +57,11 @@
std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
- utils::Identifier processoruuid;
- REQUIRE(true == listenhttp->getUUID(processoruuid));
+ utils::Identifier processoruuid = listenhttp->getUUID();
+ REQUIRE(processoruuid);
- utils::Identifier invokehttp_uuid;
- REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
+ utils::Identifier invokehttp_uuid = invokehttp->getUUID();
+ REQUIRE(invokehttp_uuid);
std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection");
gcConnection->addRelationship(core::Relationship("success", "description"));
@@ -173,11 +173,11 @@
std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
- utils::Identifier processoruuid;
- REQUIRE(true == listenhttp->getUUID(processoruuid));
+ utils::Identifier processoruuid = listenhttp->getUUID();
+ REQUIRE(processoruuid);
- utils::Identifier invokehttp_uuid;
- REQUIRE(true == invokehttp->getUUID(invokehttp_uuid));
+ utils::Identifier invokehttp_uuid = invokehttp->getUUID();
+ REQUIRE(invokehttp_uuid);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index 55e5b57..b7f7e6f 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -116,7 +116,7 @@
return queue_.size();
}
- std::string getUUIDStr() {
+ utils::SmallString<36> getUUIDStr() {
return uuid_.to_string();
}
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index ebe1721..59a43e0 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -84,7 +84,7 @@
session->import(entryMetadata.tmpFileName, flowFile, false, 0);
utils::Identifier stashKeyUuid = id_generator_->generate();
logger_->log_debug("FocusArchiveEntry generated stash key %s for entry %s", stashKeyUuid.to_string(), entryMetadata.entryName);
- entryMetadata.stashKey.assign(stashKeyUuid.to_string());
+ entryMetadata.stashKey = stashKeyUuid.to_string();
if (entryMetadata.entryName == archiveMetadata.focusedEntry) {
targetEntryStashKey = entryMetadata.stashKey;
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
index 72eaeaa..e537bc4 100644
--- a/extensions/sftp/tests/ListSFTPTests.cpp
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -564,8 +564,8 @@
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
- utils::Identifier list_sftp_uuid;
- REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+ utils::Identifier list_sftp_uuid = list_sftp->getUUID();
+ REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid);
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
@@ -606,8 +606,8 @@
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
- utils::Identifier list_sftp_uuid;
- REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+ utils::Identifier list_sftp_uuid = list_sftp->getUUID();
+ REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid);
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
@@ -848,8 +848,8 @@
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
- utils::Identifier list_sftp_uuid;
- REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+ utils::Identifier list_sftp_uuid = list_sftp->getUUID();
+ REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid);
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
@@ -908,8 +908,8 @@
REQUIRE("nifi_test" == state.at("remote_path"));
REQUIRE("Tracking Entities" == state.at("listing_strategy"));
- utils::Identifier list_sftp_uuid;
- REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+ utils::Identifier list_sftp_uuid = list_sftp->getUUID();
+ REQUIRE(list_sftp_uuid);
createPlan(&list_sftp_uuid);
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
diff --git a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
index fa597ce..359f032 100644
--- a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
+++ b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
@@ -57,8 +57,8 @@
std::shared_ptr<minifi::FlowController> controller = std::make_shared<
TestFlowController>(test_repo, test_repo, content_repo);
- utils::Identifier processoruuid;
- assert(true == processor->getUUID(processoruuid));
+ utils::Identifier processoruuid = processor->getUUID();
+ assert(processoruuid);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "executeProcessConnection");
connection->addRelationship(core::Relationship("success", "description"));
diff --git a/extensions/standard-processors/tests/unit/ExtractTextTests.cpp b/extensions/standard-processors/tests/unit/ExtractTextTests.cpp
index 95f912a..62cf147 100644
--- a/extensions/standard-processors/tests/unit/ExtractTextTests.cpp
+++ b/extensions/standard-processors/tests/unit/ExtractTextTests.cpp
@@ -46,8 +46,8 @@
TestController testController;
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ExtractText>("processorname");
REQUIRE(processor->getName() == "processorname");
- utils::Identifier processoruuid;
- REQUIRE(processor->getUUID(processoruuid));
+ utils::Identifier processoruuid = processor->getUUID();
+ REQUIRE(processoruuid);
}
TEST_CASE("Test usage of ExtractText", "[extracttextTest]") {
diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
index 5eb537f..141a197 100644
--- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
@@ -63,11 +63,11 @@
processor->setStreamFactory(stream_factory);
processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
+ utils::Identifier processoruuid = processor->getUUID();
+ REQUIRE(processoruuid);
- utils::Identifier logattribute_uuid;
- REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
+ utils::Identifier logattribute_uuid = logAttribute->getUUID();
+ REQUIRE(logattribute_uuid);
REQUIRE(processoruuid.to_string() != logattribute_uuid.to_string());
@@ -175,11 +175,11 @@
processor->setStreamFactory(stream_factory);
processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
+ utils::Identifier processoruuid = processor->getUUID();
+ REQUIRE(processoruuid);
- utils::Identifier logattribute_uuid;
- REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
+ utils::Identifier logattribute_uuid = logAttribute->getUUID();
+ REQUIRE(logattribute_uuid);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "gettcpexampleConnection");
connection->addRelationship(core::Relationship("partial", "description"));
@@ -297,11 +297,11 @@
processor->setStreamFactory(stream_factory);
processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
+ utils::Identifier processoruuid = processor->getUUID();
+ REQUIRE(processoruuid);
- utils::Identifier logattribute_uuid;
- REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
+ utils::Identifier logattribute_uuid = logAttribute->getUUID();
+ REQUIRE(logattribute_uuid);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "gettcpexampleConnection");
connection->addRelationship(core::Relationship("success", "description"));
diff --git a/extensions/standard-processors/tests/unit/HashContentTest.cpp b/extensions/standard-processors/tests/unit/HashContentTest.cpp
index c699455..2504cb6 100644
--- a/extensions/standard-processors/tests/unit/HashContentTest.cpp
+++ b/extensions/standard-processors/tests/unit/HashContentTest.cpp
@@ -54,8 +54,7 @@
TestController testController;
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::HashContent>("processorname");
REQUIRE(processor->getName() == "processorname");
- utils::Identifier processoruuid;
- REQUIRE(processor->getUUID(processoruuid));
+ REQUIRE(processor->getUUID());
}
TEST_CASE("Test usage of ExtractText", "[extracttextTest]") {
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 13350e5..9fefd92 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -65,8 +65,8 @@
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
+ utils::Identifier processoruuid = processor->getUUID();
+ REQUIRE(processoruuid);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "getfileCreate2Connection");
@@ -150,8 +150,8 @@
char format[] = "/tmp/gt.XXXXXX";
const auto dir = testController.createTempDirectory(format);
- utils::Identifier processoruuid;
- REQUIRE(processor->getUUID(processoruuid));
+ utils::Identifier processoruuid = processor->getUUID();
+ REQUIRE(processoruuid);
std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "getfileCreate2Connection");
@@ -249,8 +249,7 @@
connection->addRelationship(core::Relationship("success", "description"));
- utils::Identifier processoruuid;
- processor->getUUID(processoruuid);
+ utils::Identifier processoruuid = processor->getUUID();
// link the connections so that we can test results at the end for this
connection->setSource(processor);
diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index 5389888..dc71c5a 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -140,9 +140,8 @@
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("TailFile"));
- utils::Identifier uuid;
- rootFlowConfig->findProcessorByName("TailFile")->getUUID(uuid);
- REQUIRE(!uuid.isNil());
+ utils::Identifier uuid = rootFlowConfig->findProcessorByName("TailFile")->getUUID();
+ REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("TailFile")->getUUIDStr().empty());
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
REQUIRE(
@@ -443,9 +442,8 @@
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("TailFile"));
- utils::Identifier uuid;
- rootFlowConfig->findProcessorByName("TailFile")->getUUID(uuid);
- REQUIRE(!uuid.isNil());
+ utils::Identifier uuid = rootFlowConfig->findProcessorByName("TailFile")->getUUID();
+ REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("TailFile")->getUUIDStr().empty());
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
@@ -496,9 +494,8 @@
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("PutFile"));
- utils::Identifier uuid;
- rootFlowConfig->findProcessorByName("PutFile")->getUUID(uuid);
- REQUIRE(!uuid.isNil());
+ utils::Identifier uuid = rootFlowConfig->findProcessorByName("PutFile")->getUUID();
+ REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("PutFile")->getUUIDStr().empty());
REQUIRE(LogTestController::getInstance().contains("[warning] Unable to set the dynamic property "
@@ -537,9 +534,8 @@
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("GetFile"));
- utils::Identifier uuid;
- rootFlowConfig->findProcessorByName("GetFile")->getUUID(uuid);
- REQUIRE(!uuid.isNil());
+ utils::Identifier uuid = rootFlowConfig->findProcessorByName("GetFile")->getUUID();
+ REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("GetFile")->getUUIDStr().empty());
} catch (const std::exception &e) {
caught_exception = true;
@@ -580,9 +576,8 @@
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("XYZ"));
- utils::Identifier uuid;
- rootFlowConfig->findProcessorByName("XYZ")->getUUID(uuid);
- REQUIRE(!uuid.isNil());
+ utils::Identifier uuid = rootFlowConfig->findProcessorByName("XYZ")->getUUID();
+ REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("XYZ")->getUUIDStr().empty());
}
diff --git a/extensions/windows-event-log/Bookmark.cpp b/extensions/windows-event-log/Bookmark.cpp
index 07dafef..214f47d 100644
--- a/extensions/windows-event-log/Bookmark.cpp
+++ b/extensions/windows-event-log/Bookmark.cpp
@@ -30,7 +30,7 @@
namespace processors {
static const std::string BOOKMARK_KEY = "bookmark";
-Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<core::CoreComponentStateManager> state_manager, std::shared_ptr<logging::Logger> logger)
+Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const utils::Identifier& uuid, bool processOldEvents, std::shared_ptr<core::CoreComponentStateManager> state_manager, std::shared_ptr<logging::Logger> logger)
: logger_(logger)
, state_manager_(state_manager) {
std::unordered_map<std::string, std::string> state_map;
@@ -39,7 +39,7 @@
} else if (!bookmarkRootDir.empty()) {
filePath_ = utils::file::FileUtils::concat_path(
utils::file::FileUtils::concat_path(
- utils::file::FileUtils::concat_path(bookmarkRootDir, "uuid"), uuid), "Bookmark.txt");
+ utils::file::FileUtils::concat_path(bookmarkRootDir, "uuid"), uuid.to_string()), "Bookmark.txt");
std::wstring bookmarkXml;
if (getBookmarkXmlFromFile(bookmarkXml)) {
diff --git a/extensions/windows-event-log/Bookmark.h b/extensions/windows-event-log/Bookmark.h
index c552421..d2e7742 100644
--- a/extensions/windows-event-log/Bookmark.h
+++ b/extensions/windows-event-log/Bookmark.h
@@ -38,7 +38,7 @@
class Bookmark {
public:
- Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<core::CoreComponentStateManager> state_manager, std::shared_ptr<logging::Logger> logger);
+ Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const utils::Identifier& uuid, bool processOldEvents, std::shared_ptr<core::CoreComponentStateManager> state_manager, std::shared_ptr<logging::Logger> logger);
~Bookmark();
explicit operator bool() const noexcept;
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index 4a3d552..aee9cf8 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -285,7 +285,7 @@
logger_->log_error("State Directory is empty");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "State Directory is empty");
}
- bookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), processOldEvents, state_manager_, logger_);
+ bookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUID(), processOldEvents, state_manager_, logger_);
if (!*bookmark_) {
bookmark_.reset();
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bookmark is empty");
diff --git a/extensions/windows-event-log/tests/BookmarkTests.cpp b/extensions/windows-event-log/tests/BookmarkTests.cpp
index 6889576..d42ebad 100644
--- a/extensions/windows-event-log/tests/BookmarkTests.cpp
+++ b/extensions/windows-event-log/tests/BookmarkTests.cpp
@@ -37,7 +37,7 @@
std::unique_ptr<Bookmark> createBookmark(TestPlan &test_plan,
const std::wstring &channel,
- const std::string &uuid = IdGenerator::getIdGenerator()->generate().to_string()) {
+ const utils::Identifier &uuid = IdGenerator::getIdGenerator()->generate()) {
const auto state_manager = test_plan.getStateManagerProvider()->getCoreComponentStateManager(uuid);
const auto logger = test_plan.getLogger();
return utils::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
@@ -114,7 +114,7 @@
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
LogTestController::getInstance().setTrace<TestPlan>();
- std::string uuid = IdGenerator::getIdGenerator()->generate().to_string();
+ utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
std::unique_ptr<Bookmark> bookmark_before = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid);
std::wstring bookmark_xml_before = bookmarkAsXml(bookmark_before);
@@ -132,12 +132,12 @@
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
LogTestController::getInstance().setTrace<TestPlan>();
- std::string uuid_one = IdGenerator::getIdGenerator()->generate().to_string();
+ utils::Identifier uuid_one = IdGenerator::getIdGenerator()->generate();
std::unique_ptr<Bookmark> bookmark_before = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_one);
reportEvent(APPLICATION_CHANNEL, "Something interesting happened");
- std::string uuid_two = IdGenerator::getIdGenerator()->generate().to_string();
+ utils::Identifier uuid_two = IdGenerator::getIdGenerator()->generate();
// different uuid, so we get a new, empty, state manager
std::unique_ptr<Bookmark> bookmark_after = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_two);
@@ -149,7 +149,7 @@
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
LogTestController::getInstance().setTrace<TestPlan>();
- std::string uuid = IdGenerator::getIdGenerator()->generate().to_string();
+ utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
std::unique_ptr<Bookmark> bookmark_one = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid);
std::unique_ptr<Bookmark> bookmark_two = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid);
@@ -214,12 +214,12 @@
LogTestController::getInstance().setTrace<TestPlan>();
GIVEN("We have two different bookmarks with two different state managers") {
- std::string uuid_one = IdGenerator::getIdGenerator()->generate().to_string();
+ utils::Identifier uuid_one = IdGenerator::getIdGenerator()->generate();
std::unique_ptr<Bookmark> bookmark_one = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_one);
reportEvent(APPLICATION_CHANNEL, "Something interesting happened");
- std::string uuid_two = IdGenerator::getIdGenerator()->generate().to_string();
+ utils::Identifier uuid_two = IdGenerator::getIdGenerator()->generate();
std::unique_ptr<Bookmark> bookmark_two = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_two);
REQUIRE(bookmarkAsXml(bookmark_one) != bookmarkAsXml(bookmark_two));
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index f484c9f..f54844a 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -66,12 +66,12 @@
dest_uuid_ = uuid;
}
// Get Source Processor UUID
- void getSourceUUID(utils::Identifier &uuid) {
- uuid = src_uuid_;
+ utils::Identifier getSourceUUID() const {
+ return src_uuid_;
}
// Get Destination Processor UUID
- void getDestinationUUID(utils::Identifier &uuid) {
- uuid = dest_uuid_;
+ utils::Identifier getDestinationUUID() const {
+ return dest_uuid_;
}
// Set Connection Source Processor
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
index 3f1e7e0..b0b9f1b 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -63,8 +63,8 @@
private:
std::mutex mutex_;
- std::map<std::string, Bosma::Cron> schedules_;
- std::map<std::string, std::chrono::system_clock::time_point> last_exec_;
+ std::map<utils::Identifier, Bosma::Cron> schedules_;
+ std::map<utils::Identifier, std::chrono::system_clock::time_point> last_exec_;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
CronDrivenSchedulingAgent(const CronDrivenSchedulingAgent &parent);
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index cd1804e..b566ff8 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -168,10 +168,8 @@
return "FlowController";
}
- std::string getComponentUUID() const override {
- utils::Identifier ident;
- root_->getUUID(ident);
- return ident.to_string();
+ utils::Identifier getComponentUUID() const override {
+ return root_->getUUID();
}
// get version
@@ -283,7 +281,7 @@
* Returns controller service components referenced by serviceIdentifier from the embedded
* controller service provider;
*/
- std::shared_ptr<core::controller::ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const std::string &componentId) override;
+ std::shared_ptr<core::controller::ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) override;
/**
* Enables all controller services for the provider.
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index fb5532d..6f6a3d9 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -90,9 +90,9 @@
// onTrigger, return whether the yield is need
bool onTrigger(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory);
// Whether agent has work to do
- bool hasWorkToDo(std::shared_ptr<core::Processor> processor);
+ bool hasWorkToDo(const std::shared_ptr<core::Processor>& processor);
// Whether the outgoing need to be backpressure
- bool hasTooMuchOutGoing(std::shared_ptr<core::Processor> processor);
+ bool hasTooMuchOutGoing(const std::shared_ptr<core::Processor>& processor);
// start
void start() {
running_ = true;
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index 2e8edda..2b1b461 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -75,7 +75,7 @@
ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent);
std::shared_ptr<logging::Logger> logger_;
- std::set<std::string> processors_running_; // Set just for easy usage
+ std::set<utils::Identifier> processors_running_; // Set just for easy usage
};
} // namespace minifi
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index edcf096..5f38e48 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -238,7 +238,7 @@
utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
- std::vector<std::string> task_ids_;
+ std::vector<utils::Identifier> task_ids_;
bool manifest_sent_;
diff --git a/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h b/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
index 08a16fa..6002bda 100644
--- a/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
+++ b/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
@@ -20,6 +20,7 @@
#include <unordered_map>
#include <string>
#include <memory>
+#include <map>
#include "core/Core.h"
#include "core/CoreComponentState.h"
@@ -35,23 +36,23 @@
public:
virtual ~AbstractCoreComponentStateManagerProvider();
- std::shared_ptr<core::CoreComponentStateManager> getCoreComponentStateManager(const std::string& uuid) override;
+ std::shared_ptr<core::CoreComponentStateManager> getCoreComponentStateManager(const utils::Identifier& uuid) override;
- std::unordered_map<std::string, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() override;
+ std::map<utils::Identifier, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() override;
class AbstractCoreComponentStateManager : public core::CoreComponentStateManager{
private:
std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider_;
- std::string id_;
+ utils::Identifier id_;
bool state_valid_;
- std::unordered_map<std::string, std::string> state_;
+ core::CoreComponentState state_;
public:
- AbstractCoreComponentStateManager(std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider, const std::string& id);
+ AbstractCoreComponentStateManager(std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider, const utils::Identifier& id);
- bool set(const std::unordered_map<std::string, std::string>& kvs) override;
+ bool set(const core::CoreComponentState& kvs) override;
- bool get(std::unordered_map<std::string, std::string>& kvs) override;
+ bool get(core::CoreComponentState& kvs) override;
bool clear() override;
@@ -59,14 +60,14 @@
};
protected:
- virtual bool setImpl(const std::string& key, const std::string& value) = 0;
- virtual bool getImpl(const std::string& key, std::string& value) = 0;
- virtual bool getImpl(std::unordered_map<std::string, std::string>& kvs) = 0;
- virtual bool removeImpl(const std::string& key) = 0;
+ virtual bool setImpl(const utils::Identifier& key, const std::string& serialized_state) = 0;
+ virtual bool getImpl(const utils::Identifier& key, std::string& serialized_state) = 0;
+ virtual bool getImpl(std::map<utils::Identifier, std::string>& kvs) = 0;
+ virtual bool removeImpl(const utils::Identifier& key) = 0;
virtual bool persistImpl() = 0;
- virtual std::string serialize(const std::unordered_map<std::string, std::string>& kvs);
- bool deserialize(const std::string& serialized, std::unordered_map<std::string, std::string>& kvs);
+ virtual std::string serialize(const core::CoreComponentState& kvs);
+ bool deserialize(const std::string& serialized, core::CoreComponentState& kvs);
};
} // namespace controllers
diff --git a/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
index b183db6..ca2f0b4 100644
--- a/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
+++ b/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
@@ -19,6 +19,7 @@
#include <string>
#include <unordered_map>
+#include <map>
#include "KeyValueStoreService.h"
#include "AbstractCoreComponentStateManagerProvider.h"
@@ -40,10 +41,10 @@
virtual bool persist() = 0;
protected:
- bool setImpl(const std::string& key, const std::string& value) override;
- bool getImpl(const std::string& key, std::string& value) override;
- bool getImpl(std::unordered_map<std::string, std::string>& kvs) override;
- bool removeImpl(const std::string& key) override;
+ bool setImpl(const utils::Identifier& key, const std::string& serialized_state) override;
+ bool getImpl(const utils::Identifier& key, std::string& serialized_state) override;
+ bool getImpl(std::map<utils::Identifier, std::string>& kvs) override;
+ bool removeImpl(const utils::Identifier& key) override;
bool persistImpl() override;
};
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index 6ca5303..f80bb9d 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -181,14 +181,11 @@
*/
void setUUID(const utils::Identifier& uuid);
- void setUUIDStr(const std::string &uuidStr);
-
/**
- * Returns the UUID through the provided object.
- * @param uuid uuid struct to which we will copy the memory
- * @return success of request
+ * Returns the UUID.
+ * @return the uuid of the component
*/
- bool getUUID(utils::Identifier &uuid) const;
+ utils::Identifier getUUID() const;
// unsigned const char *getUUID();
/**
diff --git a/libminifi/include/core/CoreComponentState.h b/libminifi/include/core/CoreComponentState.h
index 328148e..3174b40 100644
--- a/libminifi/include/core/CoreComponentState.h
+++ b/libminifi/include/core/CoreComponentState.h
@@ -23,6 +23,7 @@
#include <cstdint>
#include <memory>
#include <unordered_map>
+#include <map>
#include <string>
namespace org {
@@ -31,13 +32,15 @@
namespace minifi {
namespace core {
+using CoreComponentState = std::unordered_map<std::string, std::string>;
+
class CoreComponentStateManager {
public:
virtual ~CoreComponentStateManager() = default;
- virtual bool set(const std::unordered_map<std::string, std::string>& kvs) = 0;
+ virtual bool set(const CoreComponentState& kvs) = 0;
- virtual bool get(std::unordered_map<std::string, std::string>& kvs) = 0;
+ virtual bool get(CoreComponentState& kvs) = 0;
virtual bool clear() = 0;
@@ -48,13 +51,13 @@
public:
virtual ~CoreComponentStateManagerProvider() = default;
- virtual std::shared_ptr<CoreComponentStateManager> getCoreComponentStateManager(const std::string& uuid) = 0;
+ virtual std::shared_ptr<CoreComponentStateManager> getCoreComponentStateManager(const utils::Identifier& uuid) = 0;
virtual std::shared_ptr<CoreComponentStateManager> getCoreComponentStateManager(const CoreComponent& component) {
- return getCoreComponentStateManager(component.getUUIDStr());
+ return getCoreComponentStateManager(component.getUUID());
}
- virtual std::unordered_map<std::string, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() = 0;
+ virtual std::map<utils::Identifier, CoreComponentState> getAllCoreComponentStates() = 0;
};
} // namespace core
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index 687ac0b..4ced9cd 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -85,7 +85,7 @@
/**
* Get lineage identifiers
*/
- std::vector<std::string> &getlineageIdentifiers();
+ std::vector<utils::Identifier> &getlineageIdentifiers();
/**
* Returns whether or not this flow file record
@@ -124,7 +124,7 @@
*/
void setLineageStartDate(const uint64_t date);
- void setLineageIdentifiers(const std::vector<std::string>& lineage_Identifiers) {
+ void setLineageIdentifiers(const std::vector<utils::Identifier>& lineage_Identifiers) {
lineage_Identifiers_ = lineage_Identifiers;
}
/**
@@ -278,7 +278,7 @@
// UUID string
// std::string uuid_str_;
// UUID string for all parents
- std::vector<std::string> lineage_Identifiers_;
+ std::vector<utils::Identifier> lineage_Identifiers_;
// Orginal connection queue that this flow file was dequeued from
std::shared_ptr<core::Connectable> connection_;
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 3e0da1a..d5164c3 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -172,7 +172,7 @@
* identifier
*/
std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) {
- return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr());
+ return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUID());
}
/**
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 6303da1..bb98c37 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -154,9 +154,9 @@
return config_version_;
}
// Start Processing
- void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
+ void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
// Stop Processing
- void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const std::shared_ptr<Processor>&)>& filter = [] (const std::shared_ptr<Processor>&) {return true;}); // NOLINT
+ void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const std::shared_ptr<Processor>&)>& filter = [] (const std::shared_ptr<Processor>&) {return true;}); // NOLINT
// Whether it is root process group
bool isRootProcessGroup();
// set parent process group
@@ -170,15 +170,15 @@
return parent_process_group_;
}
// Add processor
- void addProcessor(std::shared_ptr<Processor> processor);
+ void addProcessor(const std::shared_ptr<Processor>& processor);
// Remove processor
- void removeProcessor(std::shared_ptr<Processor> processor);
+ void removeProcessor(const std::shared_ptr<Processor>& processor);
// Add child processor group
void addProcessGroup(ProcessGroup *child);
// Remove child processor group
void removeProcessGroup(ProcessGroup *child);
// ! Add connections
- void addConnection(std::shared_ptr<Connection> connection);
+ void addConnection(const std::shared_ptr<Connection>& connection);
// Generic find
template <typename Fun>
std::shared_ptr<Processor> findProcessor(Fun condition) const {
@@ -216,7 +216,7 @@
std::shared_ptr<core::controller::ControllerServiceNode> findControllerService(const std::string &nodeId);
// removeConnection
- void removeConnection(std::shared_ptr<Connection> connection);
+ void removeConnection(const std::shared_ptr<Connection>& connection);
// update property value
void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue);
@@ -231,7 +231,7 @@
std::size_t getTotalFlowFileCount() const;
protected:
- void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
+ void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
// version
int config_version_;
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
index b0b24dc..0844a09 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -252,15 +252,15 @@
* @param uuid uuid struct to which we will copy the memory
* @return success of request
*/
- bool getUUID(utils::Identifier &uuid) {
- return processor_->getUUID(uuid);
+ utils::Identifier getUUID() const {
+ return processor_->getUUID();
}
/**
* Return the UUID string
* @return the UUID str
*/
- std::string getUUIDStr() const {
+ utils::SmallString<36> getUUIDStr() const {
return processor_->getUUIDStr();
}
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index 428b977..c6b6f04 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -175,7 +175,7 @@
* Returns a controller service for the service identifier and componentID
* @param service Identifier service identifier.
*/
- virtual std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const std::string &componentId) {
+ virtual std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) {
std::shared_ptr<ControllerService> node = getControllerService(serviceIdentifier);
return node;
}
diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h
index 332151f..0474a2a 100644
--- a/libminifi/include/core/logging/Logger.h
+++ b/libminifi/include/core/logging/Logger.h
@@ -59,8 +59,11 @@
return arr.c_str();
}
-template<typename T>
-inline T conditional_conversion(T const& t) {
+template<typename T, typename = typename std::enable_if<
+ std::is_arithmetic<T>::value ||
+ std::is_enum<T>::value ||
+ std::is_pointer<T>::value>::type>
+inline T conditional_conversion(T t) {
return t;
}
diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h
index 33a7e30..abf01e6 100644
--- a/libminifi/include/core/state/ProcessorController.h
+++ b/libminifi/include/core/state/ProcessorController.h
@@ -46,8 +46,8 @@
return processor_->getName();
}
- virtual std::string getComponentUUID() const {
- return processor_->getUUIDStr();
+ virtual utils::Identifier getComponentUUID() const {
+ return processor_->getUUID();
}
std::shared_ptr<core::Processor> getProcessor() {
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index ad3f5d3..2c93657 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -154,7 +154,7 @@
virtual std::string getComponentName() const = 0;
- virtual std::string getComponentUUID() const = 0;
+ virtual utils::Identifier getComponentUUID() const = 0;
/**
* Start the client
*/
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index 6075546..14ab127 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -476,7 +476,7 @@
SerializedResponseNode uuidNode;
uuidNode.name = "uuid";
- uuidNode.value = component->getComponentUUID();
+ uuidNode.value = std::string{component->getComponentUUID().to_string()};
SerializedResponseNode componentStatusNode;
componentStatusNode.name = "running";
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h
index ba3e5e3..890f661 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -240,7 +240,7 @@
SerializedResponseNode uuidNode;
uuidNode.name = "uuid";
- uuidNode.value = component->getComponentUUID();
+ uuidNode.value = std::string{component->getComponentUUID().to_string()};
SerializedResponseNode componentStatusNode;
componentStatusNode.name = "running";
diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h
index 086a6ef..60a297c 100644
--- a/libminifi/include/io/InputStream.h
+++ b/libminifi/include/io/InputStream.h
@@ -22,6 +22,7 @@
#include <vector>
#include <string>
#include "Stream.h"
+#include "utils/Id.h"
namespace org {
namespace apache {
@@ -59,6 +60,13 @@
int read(bool& value);
/**
+ * read a uuid from stream
+ * @param value reference to the output
+ * @return resulting read size
+ **/
+ int read(utils::Identifier& value);
+
+ /**
* reads sizeof(Integral) bytes from the stream
* @param value reference in which will set the result
* @return resulting read size
diff --git a/libminifi/include/io/OutputStream.h b/libminifi/include/io/OutputStream.h
index ebee70c..f14a0b3 100644
--- a/libminifi/include/io/OutputStream.h
+++ b/libminifi/include/io/OutputStream.h
@@ -24,6 +24,7 @@
#include "Stream.h"
#include "utils/gsl.h"
#include "utils/SmallString.h"
+#include "utils/Id.h"
namespace org {
namespace apache {
@@ -56,6 +57,13 @@
int write(bool value);
/**
+ * write Identifier to stream
+ * @param value non encoded value
+ * @return resulting write size
+ **/
+ int write(const utils::Identifier& value);
+
+ /**
* write string to stream
* @param str string to write
* @return resulting write size
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 6633327..285a33f 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -170,12 +170,12 @@
// Destructor
virtual ~ProvenanceEventRecord() = default;
// Get the Event ID
- std::string getEventId() {
- return getUUIDStr();
+ utils::Identifier getEventId() {
+ return getUUID();
}
- void setEventId(const std::string &id) {
- setUUIDStr(id);
+ void setEventId(const utils::Identifier &id) {
+ setUUID(id);
}
// Get Attributes
std::map<std::string, std::string> getAttributes() {
@@ -222,7 +222,7 @@
return _componentType;
}
// Get FlowFileUuid
- std::string getFlowFileUuid() {
+ utils::Identifier getFlowFileUuid() {
return flow_uuid_;
}
// Get content full path
@@ -230,7 +230,7 @@
return _contentFullPath;
}
// Get LineageIdentifiers
- std::vector<std::string> getLineageIdentifiers() {
+ std::vector<utils::Identifier> getLineageIdentifiers() {
return _lineageIdentifiers;
}
// Get Details
@@ -258,36 +258,34 @@
_sourceSystemFlowFileIdentifier = identifier;
}
// Get Parent UUIDs
- std::vector<std::string> getParentUuids() {
+ std::vector<utils::Identifier> getParentUuids() {
return _parentUuids;
}
// Add Parent UUID
- void addParentUuid(std::string uuid) {
+ void addParentUuid(const utils::Identifier& uuid) {
if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end())
return;
else
_parentUuids.push_back(uuid);
}
// Add Parent Flow File
- void addParentFlowFile(std::shared_ptr<core::FlowFile> flow) {
- addParentUuid(flow->getUUIDStr());
- return;
+ void addParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
+ addParentUuid(flow->getUUID());
}
// Remove Parent UUID
- void removeParentUuid(std::string uuid) {
+ void removeParentUuid(const utils::Identifier& uuid) {
_parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end());
}
// Remove Parent Flow File
- void removeParentFlowFile(std::shared_ptr<core::FlowFile> flow) {
- removeParentUuid(flow->getUUIDStr());
- return;
+ void removeParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
+ removeParentUuid(flow->getUUID());
}
// Get Children UUIDs
- std::vector<std::string> getChildrenUuids() {
+ std::vector<utils::Identifier> getChildrenUuids() {
return _childrenUuids;
}
// Add Child UUID
- void addChildUuid(std::string uuid) {
+ void addChildUuid(const utils::Identifier& uuid) {
if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end())
return;
else
@@ -295,17 +293,16 @@
}
// Add Child Flow File
void addChildFlowFile(std::shared_ptr<core::FlowFile> flow) {
- addChildUuid(flow->getUUIDStr());
+ addChildUuid(flow->getUUID());
return;
}
// Remove Child UUID
- void removeChildUuid(std::string uuid) {
+ void removeChildUuid(const utils::Identifier& uuid) {
_childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end());
}
// Remove Child Flow File
void removeChildFlowFile(std::shared_ptr<core::FlowFile> flow) {
- removeChildUuid(flow->getUUIDStr());
- return;
+ removeChildUuid(flow->getUUID());
}
// Get AlternateIdentifierUri
std::string getAlternateIdentifierUri() {
@@ -336,7 +333,7 @@
_entryDate = flow->getEntryDate();
_lineageStartDate = flow->getlineageStartDate();
_lineageIdentifiers = flow->getlineageIdentifiers();
- flow_uuid_ = flow->getUUIDStr();
+ flow_uuid_ = flow->getUUID();
_attributes = flow->getAttributes();
_size = flow->getSize();
_offset = flow->getOffset();
@@ -407,7 +404,7 @@
// Size in bytes of the data corresponding to this flow file
uint64_t _size;
// flow uuid
- std::string flow_uuid_;
+ utils::Identifier flow_uuid_;
// Offset to the content
uint64_t _offset;
// Full path to the content
@@ -415,15 +412,15 @@
// Attributes key/values pairs for the flow record
std::map<std::string, std::string> _attributes;
// UUID string for all parents
- std::vector<std::string> _lineageIdentifiers;
+ std::vector<utils::Identifier> _lineageIdentifiers;
// transitUri
std::string _transitUri;
// sourceSystemFlowFileIdentifier
std::string _sourceSystemFlowFileIdentifier;
// parent UUID
- std::vector<std::string> _parentUuids;
+ std::vector<utils::Identifier> _parentUuids;
// child UUID
- std::vector<std::string> _childrenUuids;
+ std::vector<utils::Identifier> _childrenUuids;
// detail
std::string _details;
// sourceQueueIdentifier
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index 7935acb..4217be3 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -85,8 +85,8 @@
return secure_;
}
- void getPortId(utils::Identifier &other) const {
- other = port_id_;
+ utils::Identifier getPortId() const {
+ return port_id_;
}
protected:
diff --git a/libminifi/include/sitetosite/RawSocketProtocol.h b/libminifi/include/sitetosite/RawSocketProtocol.h
index 348b2e0..6a3c722 100644
--- a/libminifi/include/sitetosite/RawSocketProtocol.h
+++ b/libminifi/include/sitetosite/RawSocketProtocol.h
@@ -158,7 +158,7 @@
// Creation of a new transaction, return the transaction ID if success,
// Return NULL when any error occurs
- virtual std::shared_ptr<Transaction> createTransaction(std::string &transactionID, TransferDirection direction);
+ virtual std::shared_ptr<Transaction> createTransaction(TransferDirection direction);
//! Transfer string for the process session
virtual bool transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload,
@@ -186,7 +186,7 @@
std::atomic<uint64_t> _timeOut;
// commsIdentifier
- std::string _commsIdentifier;
+ utils::Identifier _commsIdentifier;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h
index d372fcd..f6bbf2b 100644
--- a/libminifi/include/sitetosite/SiteToSite.h
+++ b/libminifi/include/sitetosite/SiteToSite.h
@@ -246,24 +246,24 @@
current_transfers_ = 0;
total_transfers_ = 0;
_bytes = 0;
-
- // Generate the global UUID for the transaction
- uuid_str_ = uuid_.to_string();
}
// Destructor
virtual ~Transaction() = default;
// getUUIDStr
- std::string getUUIDStr() {
- return uuid_str_;
+ utils::SmallString<36> getUUIDStr() const {
+ return uuid_.to_string();
}
- void setTransactionId(const std::string str) {
- setUUIDStr(str);
+ utils::Identifier getUUID() const {
+ return uuid_;
}
- void setUUIDStr(const std::string &str) {
- uuid_str_ = str;
- uuid_ = str;
+ void setTransactionId(const utils::Identifier& id) {
+ setUUID(id);
+ }
+
+ void setUUID(const utils::Identifier &id) {
+ uuid_ = id;
}
// getState
@@ -323,8 +323,6 @@
// A global unique identifier
utils::Identifier uuid_;
- // UUID string
- std::string uuid_str_;
static std::shared_ptr<utils::IdGenerator> id_generator_;
};
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index df967ee..4280b2f 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -88,7 +88,7 @@
* @param transactionID transaction identifier
* @param direction direction of transfer
*/
- virtual std::shared_ptr<Transaction> createTransaction(std::string &transactionID, TransferDirection direction) = 0;
+ virtual std::shared_ptr<Transaction> createTransaction(TransferDirection direction) = 0;
/**
* Transfers flow files
@@ -134,7 +134,7 @@
// Receive the data packet from the transaction
// Return false when any error occurs
- bool receive(std::string transactionID, DataPacket *packet, bool &eof);
+ bool receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof);
/**
* Transfers raw data and attributes to server
* @param context process context
@@ -148,7 +148,6 @@
void setPortId(utils::Identifier &id) {
port_id_ = id;
- port_id_str_ = port_id_.to_string();
}
/**
@@ -169,8 +168,8 @@
* Provides a reference to the port identifier
* @returns port identifier
*/
- const std::string getPortId() const {
- return port_id_str_;
+ utils::Identifier getPortId() const {
+ return port_id_;
}
/**
@@ -213,19 +212,19 @@
}
// Return -1 when any error occurs
- virtual int16_t send(std::string transactionID, DataPacket *packet, const std::shared_ptr<core::FlowFile> &flowFile, const std::shared_ptr<core::ProcessSession> &session);
+ virtual int16_t send(const utils::Identifier& transactionID, DataPacket *packet, const std::shared_ptr<core::FlowFile> &flowFile, const std::shared_ptr<core::ProcessSession> &session);
protected:
// Cancel the transaction
- virtual void cancel(std::string transactionID);
+ virtual void cancel(const utils::Identifier& transactionID);
// Complete the transaction
- virtual bool complete(std::string transactionID);
+ virtual bool complete(const utils::Identifier& transactionID);
// Error the transaction
- virtual void error(std::string transactionID);
+ virtual void error(const utils::Identifier& transactionID);
- virtual bool confirm(std::string transactionID);
+ virtual bool confirm(const utils::Identifier& transactionID);
// deleteTransaction
- virtual void deleteTransaction(std::string transactionID);
+ virtual void deleteTransaction(const utils::Identifier& transactionID);
virtual void tearDown() = 0;
@@ -246,9 +245,6 @@
// Peer State
PeerState peer_state_;
- // portIDStr
- std::string port_id_str_;
-
// portId
utils::Identifier port_id_;
@@ -261,7 +257,7 @@
std::atomic<bool> running_;
// transaction map
- std::map<std::string, std::shared_ptr<Transaction>> known_transactions_;
+ std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_;
// BATCH_SEND_NANOS
uint64_t _batchSendNanos;
diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h
index f37e864..3907846 100644
--- a/libminifi/include/sitetosite/SiteToSiteFactory.h
+++ b/libminifi/include/sitetosite/SiteToSiteFactory.h
@@ -57,8 +57,7 @@
* RawSiteToSiteClient will be instantiated and returned through a unique ptr.
*/
static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientConfiguration &client_configuration) {
- utils::Identifier uuid;
- client_configuration.getPeer()->getPortId(uuid);
+ utils::Identifier uuid = client_configuration.getPeer()->getPortId();
auto rsptr = createStreamingPeer(client_configuration);
if (nullptr == rsptr) {
return nullptr;
@@ -76,8 +75,7 @@
* @returns site to site client or nullptr.
*/
static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConfiguration &client_configuration) {
- utils::Identifier uuid;
- client_configuration.getPeer()->getPortId(uuid);
+ utils::Identifier uuid = client_configuration.getPeer()->getPortId();
switch (client_configuration.getClientType()) {
case RAW:
return createRawSocket(client_configuration);
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index bcd8119..e724ace 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -32,6 +32,7 @@
#include "core/logging/Logger.h"
#include "properties/Properties.h"
#include "OptionalUtils.h"
+#include "SmallString.h"
#define UUID_TIME_IMPL 0
#define UUID_RANDOM_IMPL 1
@@ -65,6 +66,10 @@
Identifier &operator=(const std::string& idStr);
+ explicit operator bool() const {
+ return !isNil();
+ }
+
bool operator!=(const Identifier& other) const;
bool operator==(const Identifier& other) const;
bool operator<(const Identifier& other) const;
diff --git a/libminifi/include/utils/SmallString.h b/libminifi/include/utils/SmallString.h
index a4c17c6..94259c1 100644
--- a/libminifi/include/utils/SmallString.h
+++ b/libminifi/include/utils/SmallString.h
@@ -34,6 +34,10 @@
return {c_str()};
}
+ constexpr size_t length() const noexcept {
+ return N;
+ }
+
const char* c_str() const {
return this->data();
}
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 93890d7..776a90f 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -45,6 +45,8 @@
namespace minifi {
namespace utils {
+using TaskId = std::string;
+
/**
* Worker task
* purpose: Provides a wrapper for the functor
@@ -53,7 +55,7 @@
template<typename T>
class Worker {
public:
- explicit Worker(const std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
+ explicit Worker(const std::function<T()> &task, const TaskId &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
: identifier_(identifier),
next_exec_time_(std::chrono::steady_clock::now()),
task(task),
@@ -61,7 +63,7 @@
promise = std::make_shared<std::promise<T>>();
}
- explicit Worker(const std::function<T()> &task, const std::string &identifier)
+ explicit Worker(const std::function<T()> &task, const TaskId &identifier)
: identifier_(identifier),
next_exec_time_(std::chrono::steady_clock::now()),
task(task),
@@ -69,7 +71,7 @@
promise = std::make_shared<std::promise<T>>();
}
- explicit Worker(const std::string identifier = "")
+ explicit Worker(const TaskId& identifier = {})
: identifier_(identifier),
next_exec_time_(std::chrono::steady_clock::now()) {
}
@@ -104,7 +106,7 @@
return true;
}
- virtual void setIdentifier(const std::string identifier) {
+ virtual void setIdentifier(const TaskId& identifier) {
identifier_ = identifier;
}
@@ -123,12 +125,12 @@
std::shared_ptr<std::promise<T>> getPromise() const;
- const std::string &getIdentifier() const {
+ const TaskId &getIdentifier() const {
return identifier_;
}
protected:
- std::string identifier_;
+ TaskId identifier_;
std::chrono::time_point<std::chrono::steady_clock> next_exec_time_;
std::function<T()> task;
std::unique_ptr<AfterExecute<T>> run_determinant_;
@@ -221,12 +223,12 @@
* @param identifier for worker tasks. Note that these tasks won't
* immediately stop.
*/
- void stopTasks(const std::string &identifier);
+ void stopTasks(const TaskId &identifier);
/**
* Returns true if a task is running.
*/
- bool isTaskRunning(const std::string &identifier) const {
+ bool isTaskRunning(const TaskId &identifier) const {
try {
return task_status_.at(identifier) == true;
} catch (const std::out_of_range &) {
@@ -341,7 +343,7 @@
// notification for new delayed tasks that's before the current ones
std::condition_variable delayed_task_available_;
// map to identify if a task should be
- std::map<std::string, bool> task_status_;
+ std::map<TaskId, bool> task_status_;
// manager mutex
std::recursive_mutex manager_mutex_;
// thread pool name
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp
index cb0f81c..21c071a 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -35,18 +35,18 @@
utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(const std::shared_ptr<core::Processor> &processor, const std::shared_ptr<core::ProcessContext> &processContext,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
if (this->running_ && processor->isRunning()) {
- auto uuidStr = processor->getUUIDStr();
+ auto uuid = processor->getUUID();
std::chrono::system_clock::time_point result;
std::chrono::system_clock::time_point from = std::chrono::system_clock::now();
{
std::lock_guard<std::mutex> locK(mutex_);
- auto sched_f = schedules_.find(uuidStr);
+ auto sched_f = schedules_.find(uuid);
if (sched_f != std::end(schedules_)) {
- result = last_exec_[uuidStr];
+ result = last_exec_[uuid];
if (from >= result) {
result = sched_f->second.cron_to_next(from);
- last_exec_[uuidStr] = result;
+ last_exec_[uuid] = result;
} else {
// we may be woken up a little early so that we can honor our time.
// in this case we can return the next time to run with the expectation
@@ -56,8 +56,8 @@
} else {
Bosma::Cron schedule(processor->getCronPeriod());
result = schedule.cron_to_next(from);
- last_exec_[uuidStr] = result;
- schedules_.insert(std::make_pair(uuidStr, schedule));
+ last_exec_[uuid] = result;
+ schedules_.insert(std::make_pair(uuid, schedule));
}
}
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index b3d8dc2..34718f3 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -866,7 +866,7 @@
* Returns controller service components referenced by serviceIdentifier from the embedded
* controller service provider;
*/
-std::shared_ptr<core::controller::ControllerService> FlowController::getControllerServiceForComponent(const std::string &serviceIdentifier, const std::string &componentId) {
+std::shared_ptr<core::controller::ControllerService> FlowController::getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) {
return controller_service_provider_->getControllerServiceForComponent(serviceIdentifier, componentId);
}
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index e3a224e..eec81bf 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -86,14 +86,16 @@
return false;
}
- ret = outStream.write(getUUIDStr());
+ ret = outStream.write(uuid_);
if (ret <= 0) {
return false;
}
utils::Identifier containerId;
- if (connection_) connection_->getUUID(containerId);
- ret = outStream.write(containerId.to_string());
+ if (connection_) {
+ containerId = connection_->getUUID();
+ }
+ ret = outStream.write(containerId);
if (ret <= 0) {
return false;
}
@@ -177,28 +179,15 @@
return {};
}
- std::string uuidStr;
- ret = inStream.read(uuidStr);
+ ret = inStream.read(file->uuid_);
if (ret <= 0) {
return {};
}
- utils::optional<utils::Identifier> parsedUUID = utils::Identifier::parse(uuidStr);
- if (!parsedUUID) {
- return {};
- }
- file->uuid_ = parsedUUID.value();
-
- std::string connectionUUIDStr;
- ret = inStream.read(connectionUUIDStr);
+ ret = inStream.read(container);
if (ret <= 0) {
return {};
}
- utils::optional<utils::Identifier> parsedConnectionUUID = utils::Identifier::parse(connectionUUIDStr);
- if (!parsedConnectionUUID) {
- return {};
- }
- container = parsedConnectionUUID.value();
// read flow attributes
uint32_t numAttributes = 0;
diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp
index 991da9d..107aa10 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -31,7 +31,7 @@
namespace nifi {
namespace minifi {
-bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
+bool SchedulingAgent::hasWorkToDo(const std::shared_ptr<core::Processor>& processor) {
// Whether it has work to do
if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || processor->flowFilesQueued())
return true;
@@ -80,7 +80,7 @@
return future;
}
-bool SchedulingAgent::hasTooMuchOutGoing(std::shared_ptr<core::Processor> processor) {
+bool SchedulingAgent::hasTooMuchOutGoing(const std::shared_ptr<core::Processor>& processor) {
return processor->flowFilesOutGoingFull();
}
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp
index 3790865..7dda0b0 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -110,16 +110,15 @@
thread_pool_.execute(std::move(functor), future);
}
logger_->log_debug("Scheduled thread %d concurrent workers for for process %s", processor->getMaxConcurrentTasks(), processor->getName());
- processors_running_.insert(processor->getUUIDStr());
- return;
+ processors_running_.insert(processor->getUUID());
}
void ThreadedSchedulingAgent::stop() {
SchedulingAgent::stop();
std::lock_guard<std::mutex> lock(mutex_);
- for (const auto& p : processors_running_) {
- logger_->log_error("SchedulingAgent is stopped before processor was unscheduled: %s", p);
- thread_pool_.stopTasks(p);
+ for (const auto& processor_id : processors_running_) {
+ logger_->log_error("SchedulingAgent is stopped before processor was unscheduled: %s", processor_id.to_string());
+ thread_pool_.stopTasks(processor_id.to_string());
}
}
@@ -138,7 +137,7 @@
processor->setScheduledState(core::STOPPED);
- processors_running_.erase(processor->getUUIDStr());
+ processors_running_.erase(processor->getUUID());
}
} /* namespace minifi */
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 90094b9..09d4085 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -142,10 +142,9 @@
task_ids_.clear();
for (const auto& function : functions_) {
utils::Identifier uuid = utils::IdGenerator::getIdGenerator()->generate();
- const std::string uuid_str = uuid.to_string();
- task_ids_.push_back(uuid_str);
+ task_ids_.push_back(uuid);
auto monitor = utils::make_unique<utils::ComplexMonitor>();
- utils::Worker<utils::TaskRescheduleInfo> functor(function, uuid_str, std::move(monitor));
+ utils::Worker<utils::TaskRescheduleInfo> functor(function, uuid.to_string(), std::move(monitor));
std::future<utils::TaskRescheduleInfo> future;
thread_pool_.execute(std::move(functor), future);
}
@@ -157,7 +156,7 @@
void C2Agent::stop() {
controller_running_ = false;
for (const auto& id : task_ids_) {
- thread_pool_.stopTasks(id);
+ thread_pool_.stopTasks(id.to_string());
}
thread_pool_.shutdown();
logger_->log_info("C2 agent stopped");
@@ -450,7 +449,7 @@
state_manager->persist();
component->start();
} else {
- logger_->log_warn("Failed to get StateManager for component %s", component->getComponentUUID());
+ logger_->log_warn("Failed to get StateManager for component %s", component->getComponentUUID().to_string());
}
}
} else {
@@ -604,7 +603,7 @@
auto core_component_states = state_manager_provider->getAllCoreComponentStates();
for (const auto& core_component_state : core_component_states) {
C2Payload state(Operation::ACKNOWLEDGE, resp.ident, false, true);
- state.setLabel(core_component_state.first);
+ state.setLabel(core_component_state.first.to_string());
for (const auto& kv : core_component_state.second) {
C2ContentResponse entry(Operation::ACKNOWLEDGE);
entry.name = kv.first;
diff --git a/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp b/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
index 2cee77b..95f9bef 100644
--- a/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
+++ b/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
@@ -32,7 +32,7 @@
AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager(
std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider,
- const std::string& id)
+ const utils::Identifier& id)
: provider_(std::move(provider))
, id_(id)
, state_valid_(false) {
@@ -42,7 +42,7 @@
}
}
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::set(const std::unordered_map<std::string, std::string>& kvs) {
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::set(const core::CoreComponentState& kvs) {
if (provider_->setImpl(id_, provider_->serialize(kvs))) {
state_valid_ = true;
state_ = kvs;
@@ -52,7 +52,7 @@
}
}
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::get(std::unordered_map<std::string, std::string>& kvs) {
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::get(core::CoreComponentState& kvs) {
if (!state_valid_) {
return false;
}
@@ -82,19 +82,19 @@
AbstractCoreComponentStateManagerProvider::~AbstractCoreComponentStateManagerProvider() = default;
-std::shared_ptr<core::CoreComponentStateManager> AbstractCoreComponentStateManagerProvider::getCoreComponentStateManager(const std::string& uuid) {
+std::shared_ptr<core::CoreComponentStateManager> AbstractCoreComponentStateManagerProvider::getCoreComponentStateManager(const utils::Identifier& uuid) {
return std::make_shared<AbstractCoreComponentStateManager>(shared_from_this(), uuid);
}
-std::unordered_map<std::string, std::unordered_map<std::string, std::string>> AbstractCoreComponentStateManagerProvider::getAllCoreComponentStates() {
- std::unordered_map<std::string, std::string> all_serialized;
+std::map<utils::Identifier, core::CoreComponentState> AbstractCoreComponentStateManagerProvider::getAllCoreComponentStates() {
+ std::map<utils::Identifier, std::string> all_serialized;
if (!getImpl(all_serialized)) {
return {};
}
- std::unordered_map<std::string, std::unordered_map<std::string, std::string>> all_deserialized;
+ std::map<utils::Identifier, core::CoreComponentState> all_deserialized;
for (const auto& serialized : all_serialized) {
- std::unordered_map<std::string, std::string> deserialized;
+ core::CoreComponentState deserialized;
if (deserialize(serialized.second, deserialized)) {
all_deserialized.emplace(serialized.first, std::move(deserialized));
}
@@ -103,7 +103,7 @@
return all_deserialized;
}
-std::string AbstractCoreComponentStateManagerProvider::serialize(const std::unordered_map<std::string, std::string>& kvs) {
+std::string AbstractCoreComponentStateManagerProvider::serialize(const core::CoreComponentState& kvs) {
rapidjson::Document doc(rapidjson::kObjectType);
rapidjson::Document::AllocatorType &alloc = doc.GetAllocator();
for (const auto& kv : kvs) {
@@ -117,7 +117,7 @@
return buffer.GetString();
}
-bool AbstractCoreComponentStateManagerProvider::deserialize(const std::string& serialized, std::unordered_map<std::string, std::string>& kvs) {
+bool AbstractCoreComponentStateManagerProvider::deserialize(const std::string& serialized, core::CoreComponentState& kvs) {
rapidjson::StringStream stream(serialized.c_str());
rapidjson::Document doc;
rapidjson::ParseResult res = doc.ParseStream(stream);
diff --git a/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
index bf04606..6eb3745 100644
--- a/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
+++ b/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
@@ -16,6 +16,7 @@
*/
#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+#include "core/logging/LoggerConfiguration.h"
namespace org {
namespace apache {
@@ -29,20 +30,34 @@
PersistableKeyValueStoreService::~PersistableKeyValueStoreService() = default;
-bool PersistableKeyValueStoreService::setImpl(const std::string& key, const std::string& value) {
- return set(key, value);
+bool PersistableKeyValueStoreService::setImpl(const utils::Identifier& key, const std::string& serialized_state) {
+ return set(key.to_string(), serialized_state);
}
-bool PersistableKeyValueStoreService::getImpl(const std::string& key, std::string& value) {
- return get(key, value);
+bool PersistableKeyValueStoreService::getImpl(const utils::Identifier& key, std::string& serialized_state) {
+ return get(key.to_string(), serialized_state);
}
-bool PersistableKeyValueStoreService::getImpl(std::unordered_map<std::string, std::string>& kvs) {
- return get(kvs);
+bool PersistableKeyValueStoreService::getImpl(std::map<utils::Identifier, std::string>& kvs) {
+ std::unordered_map<std::string, std::string> states;
+ if (!get(states)) {
+ return false;
+ }
+ kvs.clear();
+ for (const auto& state : states) {
+ utils::optional<utils::Identifier> optional_uuid = utils::Identifier::parse(state.first);
+ if (optional_uuid) {
+ kvs[optional_uuid.value()] = state.second;
+ } else {
+ logging::LoggerFactory<PersistableKeyValueStoreService>::getLogger()
+ ->log_error("Found non-UUID key \"%s\" in storage implementation", state.first);
+ }
+ }
+ return true;
}
-bool PersistableKeyValueStoreService::removeImpl(const std::string& key) {
- return remove(key);
+bool PersistableKeyValueStoreService::removeImpl(const utils::Identifier& key) {
+ return remove(key.to_string());
}
bool PersistableKeyValueStoreService::persistImpl() {
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
index b44149c..92ed687 100644
--- a/libminifi/src/core/Core.cpp
+++ b/libminifi/src/core/Core.cpp
@@ -31,16 +31,9 @@
uuid_ = uuid;
}
-void CoreComponent::setUUIDStr(const std::string &uuidStr) {
- uuid_ = uuidStr;
-}
// Get UUID
-bool CoreComponent::getUUID(utils::Identifier &uuid) const {
- if (uuid_.isNil()) {
- return false;
- }
- uuid = uuid_;
- return true;
+utils::Identifier CoreComponent::getUUID() const {
+ return uuid_;
}
// Set Processor Name
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index a6f3fb1..322f5a5 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -138,7 +138,7 @@
return lineage_start_date_;
}
-std::vector<std::string> &FlowFile::getlineageIdentifiers() {
+std::vector<utils::Identifier> &FlowFile::getlineageIdentifiers() {
return lineage_Identifiers_;
}
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index d62d858..892cdb1 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -98,7 +98,7 @@
return (type_ == ROOT_PROCESS_GROUP);
}
-void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) {
+void ProcessGroup::addProcessor(const std::shared_ptr<Processor>& processor) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (processors_.find(processor) == processors_.end()) {
@@ -108,7 +108,7 @@
}
}
-void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) {
+void ProcessGroup::removeProcessor(const std::shared_ptr<Processor>& processor) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (processors_.find(processor) != processors_.end()) {
@@ -138,7 +138,7 @@
}
}
-void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler,
+void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler,
const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
@@ -190,7 +190,7 @@
}
}
-void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
+void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -213,7 +213,7 @@
}
}
-void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
+void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler,
const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const std::shared_ptr<Processor>&)>& filter) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -258,8 +258,8 @@
std::shared_ptr<Processor> ProcessGroup::findProcessorById(const utils::Identifier& uuid) const {
const auto id_matches = [&] (const std::shared_ptr<Processor>& processor) {
logger_->log_debug("Current processor is %s", processor->getName());
- utils::Identifier processorUUID;
- return processor->getUUID(processorUUID) && uuid == processorUUID;
+ utils::Identifier processorUUID = processor->getUUID();
+ return processorUUID && uuid == processorUUID;
};
return findProcessor(id_matches);
}
@@ -308,7 +308,6 @@
for (auto processGroup : child_process_groups_) {
processGroup->updatePropertyValue(processorName, propertyName, propertyValue);
}
- return;
}
void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) {
@@ -345,45 +344,33 @@
}
}
-void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
+void ProcessGroup::addConnection(const std::shared_ptr<Connection>& connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (connections_.find(connection) == connections_.end()) {
// We do not have the same connection in this process group yet
connections_.insert(connection);
logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_);
- utils::Identifier sourceUUID;
- std::shared_ptr<Processor> source = NULL;
- connection->getSourceUUID(sourceUUID);
- source = this->findProcessorById(sourceUUID);
+ std::shared_ptr<Processor> source = this->findProcessorById(connection->getSourceUUID());
if (source)
source->addConnection(connection);
- std::shared_ptr<Processor> destination = NULL;
- utils::Identifier destinationUUID;
- connection->getDestinationUUID(destinationUUID);
- destination = this->findProcessorById(destinationUUID);
+ std::shared_ptr<Processor> destination = this->findProcessorById(connection->getDestinationUUID());
if (destination && destination != source)
destination->addConnection(connection);
}
}
-void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) {
+void ProcessGroup::removeConnection(const std::shared_ptr<Connection>& connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (connections_.find(connection) != connections_.end()) {
// We do not have the same connection in this process group yet
connections_.erase(connection);
logger_->log_debug("Remove connection %s into process group %s", connection->getName(), name_);
- utils::Identifier sourceUUID;
- std::shared_ptr<Processor> source = NULL;
- connection->getSourceUUID(sourceUUID);
- source = this->findProcessorById(sourceUUID);
+ std::shared_ptr<Processor> source = this->findProcessorById(connection->getSourceUUID());
if (source)
source->removeConnection(connection);
- std::shared_ptr<Processor> destination = NULL;
- utils::Identifier destinationUUID;
- connection->getDestinationUUID(destinationUUID);
- destination = this->findProcessorById(destinationUUID);
+ std::shared_ptr<Processor> destination = this->findProcessorById(connection->getDestinationUUID());
if (destination && destination != source)
destination->removeConnection(connection);
}
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 5309c8e..dc002f0 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -66,8 +66,7 @@
}
void ProcessSession::add(const std::shared_ptr<core::FlowFile> &record) {
- utils::Identifier uuid;
- record->getUUID(uuid);
+ utils::Identifier uuid = record->getUUID();
if (_updatedFlowFiles.find(uuid) != _updatedFlowFiles.end()) {
throw Exception(ExceptionType::PROCESSOR_EXCEPTION, "Mustn't add file that was provided by this session");
}
@@ -93,11 +92,10 @@
}
record->setLineageStartDate(parent->getlineageStartDate());
record->setLineageIdentifiers(parent->getlineageIdentifiers());
- parent->getlineageIdentifiers().push_back(parent->getUUIDStr());
+ parent->getlineageIdentifiers().push_back(parent->getUUID());
}
- utils::Identifier uuid;
- record->getUUID(uuid);
+ utils::Identifier uuid = record->getUUID();
_addedFlowFiles[uuid] = record;
logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr());
std::stringstream details;
@@ -144,7 +142,7 @@
}
record->setLineageStartDate(parent->getlineageStartDate());
record->setLineageIdentifiers(parent->getlineageIdentifiers());
- record->getlineageIdentifiers().push_back(parent->getUUIDStr());
+ record->getlineageIdentifiers().push_back(parent->getUUID());
// Copy Resource Claim
std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
@@ -207,8 +205,7 @@
void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
- utils::Identifier uuid;
- flow->getUUID(uuid);
+ utils::Identifier uuid = flow->getUUID();
_transferRelationship[uuid] = relationship;
flow->setDeleted(false);
}
@@ -637,8 +634,7 @@
if (record->isDeleted()) {
return RouteResult::Ok_Deleted;
}
- utils::Identifier uuid;
- record->getUUID(uuid);
+ utils::Identifier uuid = record->getUUID();
auto itRelationship = _transferRelationship.find(uuid);
if (itRelationship == _transferRelationship.end()) {
return RouteResult::Error_NoRelationship;
@@ -867,8 +863,7 @@
const bool shouldDropEmptyFiles = connection ? connection->getDropEmptyFlowFiles() : false;
auto& flows = transaction.second;
for (auto &ff : flows) {
- utils::Identifier uuid;
- ff->getUUID(uuid);
+ utils::Identifier uuid = ff->getUUID();
auto snapshotIt = modifiedFlowFiles.find(uuid);
auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr;
if (shouldDropEmptyFiles && ff->getSize() == 0) {
@@ -924,8 +919,7 @@
std::shared_ptr<FlowFile> snapshot = std::make_shared<FlowFileRecord>();
*snapshot = *ret;
logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr());
- utils::Identifier uuid;
- ret->getUUID(uuid);
+ utils::Identifier uuid = ret->getUUID();
_updatedFlowFiles[uuid] = {ret, snapshot};
auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier();
if (flow_version != nullptr) {
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index ada191b..86c8384 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -124,14 +124,10 @@
}
});
- utils::Identifier srcUUID;
- utils::Identifier destUUID;
+ utils::Identifier srcUUID = connection->getSourceUUID();
+ utils::Identifier destUUID = connection->getDestinationUUID();
- connection->getSourceUUID(srcUUID);
- connection->getDestinationUUID(destUUID);
- std::string my_uuid = uuid_.to_string();
- std::string destination_uuid = destUUID.to_string();
- if (my_uuid == destination_uuid) {
+ if (uuid_ == destUUID) {
// Connection is destination to the current processor
if (_incomingConnections.find(connection) == _incomingConnections.end()) {
_incomingConnections.insert(connection);
@@ -141,8 +137,7 @@
result = SetAs::OUTPUT;
}
}
- std::string source_uuid = srcUUID.to_string();
- if (my_uuid == source_uuid) {
+ if (uuid_ == srcUUID) {
const auto &rels = connection->getRelationships();
for (auto i = rels.begin(); i != rels.end(); i++) {
const auto relationship = (*i).getName();
@@ -181,13 +176,10 @@
std::lock_guard<std::mutex> lock(getGraphMutex());
- utils::Identifier srcUUID;
- utils::Identifier destUUID;
-
std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn);
- connection->getSourceUUID(srcUUID);
- connection->getDestinationUUID(destUUID);
+ utils::Identifier srcUUID = connection->getSourceUUID();
+ utils::Identifier destUUID = connection->getDestinationUUID();
if (uuid_ == destUUID) {
// Connection is destination to the current processor
diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp
index 044e3f4..ba7bc7b 100644
--- a/libminifi/src/core/ProcessorNode.cpp
+++ b/libminifi/src/core/ProcessorNode.cpp
@@ -28,17 +28,13 @@
: processor_(processor),
Connectable(processor->getName()),
ConfigurableComponent() {
- utils::Identifier copy;
- processor->getUUID(copy);
- setUUID(copy);
+ setUUID(processor->getUUID());
}
ProcessorNode::ProcessorNode(const ProcessorNode &other)
: processor_(other.processor_),
Connectable(other.getName()) {
- utils::Identifier copy;
- processor_->getUUID(copy);
- setUUID(copy);
+ setUUID(processor_->getUUID());
}
ProcessorNode::ProcessorNode(const ProcessorNode &&other)
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index c25fa64..c5b83af 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -68,16 +68,29 @@
parent.AddMember(keyVal, valueVal, alloc);
}
-rapidjson::Value getStringValue(const std::string& value, rapidjson::Document::AllocatorType& alloc) { // NOLINT
+rapidjson::Value getStringValue(const std::string& value, rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value Val;
Val.SetString(value.c_str(), value.length(), alloc);
return Val;
}
-void appendJsonStr(const std::string& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { // NOLINT
+template<size_t N>
+rapidjson::Value getStringValue(const utils::SmallString<N>& value, rapidjson::Document::AllocatorType& alloc) {
+ rapidjson::Value Val;
+ Val.SetString(value.c_str(), value.length(), alloc);
+ return Val;
+}
+
+void appendJsonStr(const std::string& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value valueVal;
- const char* c_val = value.c_str();
- valueVal.SetString(c_val, value.length(), alloc);
+ valueVal.SetString(value.c_str(), value.length(), alloc);
+ parent.PushBack(valueVal, alloc);
+}
+
+template<size_t N>
+void appendJsonStr(const utils::SmallString<N>& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) {
+ rapidjson::Value valueVal;
+ valueVal.SetString(value.c_str(), value.length(), alloc);
parent.PushBack(valueVal, alloc);
}
@@ -105,12 +118,12 @@
recordJson.AddMember("entityType", "org.apache.nifi.flowfile.FlowFile", alloc);
- recordJson.AddMember("eventId", getStringValue(record->getEventId(), alloc), alloc);
+ recordJson.AddMember("eventId", getStringValue(record->getEventId().to_string(), alloc), alloc);
recordJson.AddMember("eventType", getStringValue(provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()], alloc), alloc);
recordJson.AddMember("details", getStringValue(record->getDetails(), alloc), alloc);
recordJson.AddMember("componentId", getStringValue(record->getComponentId(), alloc), alloc);
recordJson.AddMember("componentType", getStringValue(record->getComponentType(), alloc), alloc);
- recordJson.AddMember("entityId", getStringValue(record->getFlowFileUuid(), alloc), alloc);
+ recordJson.AddMember("entityId", getStringValue(record->getFlowFileUuid().to_string(), alloc), alloc);
recordJson.AddMember("transitUri", getStringValue(record->getTransitUri(), alloc), alloc);
recordJson.AddMember("remoteIdentifier", getStringValue(record->getSourceSystemFlowFileIdentifier(), alloc), alloc);
recordJson.AddMember("alternateIdentifier", getStringValue(record->getAlternateIdentifierUri(), alloc), alloc);
@@ -121,12 +134,12 @@
recordJson.AddMember("updatedAttributes", updatedAttributesJson, alloc);
for (auto parentUUID : record->getParentUuids()) {
- appendJsonStr(parentUUID, parentUuidJson, alloc);
+ appendJsonStr(parentUUID.to_string(), parentUuidJson, alloc);
}
recordJson.AddMember("parentIds", parentUuidJson, alloc);
for (auto childUUID : record->getChildrenUuids()) {
- appendJsonStr(childUUID, childUuidJson, alloc);
+ appendJsonStr(childUUID.to_string(), childUuidJson, alloc);
}
recordJson.AddMember("childIds", childUuidJson, alloc);
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index a9161fc..3c80f37 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -619,7 +619,7 @@
// lastly, look the processor up by name
auto srcProcessor = parent->findProcessorByName(connectionSrcProcName);
if (NULL != srcProcessor) {
- srcProcessor->getUUID(srcUUID);
+ srcUUID = srcProcessor->getUUID();
logger_->log_debug("Using 'source name' to match source with same name for "
"connection '%s': source name => [%s]",
name, connectionSrcProcName);
@@ -657,7 +657,7 @@
// look the processor up by name
auto destProcessor = parent->findProcessorByName(connectionDestProcName);
if (NULL != destProcessor) {
- destProcessor->getUUID(destUUID);
+ destUUID = destProcessor->getUUID();
logger_->log_debug("Using 'destination name' to match destination with same name for "
"connection '%s': destination name => [%s]",
name, connectionDestProcName);
@@ -970,8 +970,7 @@
"of YAML::NodeType::Scalar.");
}
} else {
- utils::Identifier uuid = id_generator_->generate();
- id = uuid.to_string();
+ id = id_generator_->generate().to_string();
logger_->log_debug("Generating random ID: id => [%s]", id);
}
return id;
diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp
index e395e83..9905abf 100644
--- a/libminifi/src/io/InputStream.cpp
+++ b/libminifi/src/io/InputStream.cpp
@@ -21,6 +21,7 @@
#include <string>
#include <algorithm>
#include "io/InputStream.h"
+#include "utils/OptionalUtils.h"
namespace org {
namespace apache {
@@ -47,6 +48,20 @@
return 1;
}
+int InputStream::read(utils::Identifier &value) {
+ std::string uuidStr;
+ int ret = read(uuidStr);
+ if (ret < 0) {
+ return ret;
+ }
+ auto optional_uuid = utils::Identifier::parse(uuidStr);
+ if (!optional_uuid) {
+ return -1;
+ }
+ value = optional_uuid.value();
+ return ret;
+}
+
int InputStream::read(std::string &str, bool widen) {
uint32_t len = 0;
int ret = 0;
diff --git a/libminifi/src/io/OutputStream.cpp b/libminifi/src/io/OutputStream.cpp
index efbc02f..4f82c63 100644
--- a/libminifi/src/io/OutputStream.cpp
+++ b/libminifi/src/io/OutputStream.cpp
@@ -41,6 +41,10 @@
return write(&temp, 1);
}
+int OutputStream::write(const utils::Identifier &value) {
+ return write(value.to_string());
+}
+
int OutputStream::write(const std::string& str, bool widen) {
return write_str(str.c_str(), str.length(), widen);
}
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index 5ac140b..5bedec6 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -87,7 +87,7 @@
bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStream& outStream) {
int ret;
- ret = outStream.write(this->getUUIDStr());
+ ret = outStream.write(this->uuid_);
if (ret <= 0) {
return false;
}
@@ -236,14 +236,7 @@
org::apache::nifi::minifi::io::BufferStream outStream(buffer, bufferSize);
- std::string uuidStr;
- ret = outStream.read(uuidStr);
- utils::optional<utils::Identifier> uuid = utils::Identifier::parse(uuidStr);
- if (!uuid) {
- return false;
- }
- uuid_ = uuid.value();
-
+ ret = outStream.read(uuid_);
if (ret <= 0) {
return false;
}
@@ -291,7 +284,6 @@
}
ret = outStream.read(this->_details);
-
if (ret <= 0) {
return false;
}
@@ -346,7 +338,7 @@
}
for (uint32_t i = 0; i < number; i++) {
- std::string parentUUID;
+ utils::Identifier parentUUID;
ret = outStream.read(parentUUID);
if (ret <= 0) {
return false;
@@ -359,7 +351,7 @@
return false;
}
for (uint32_t i = 0; i < number; i++) {
- std::string childUUID;
+ utils::Identifier childUUID;
ret = outStream.read(childUUID);
if (ret <= 0) {
return false;
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 665b3d2..5f1b55c 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -111,7 +111,7 @@
return false;
}
- logger_->log_debug("Negotiate protocol version with destination port %s current version %d", port_id_str_, _currentVersion);
+ logger_->log_debug("Negotiate protocol version with destination port %s current version %d", port_id_.to_string(), _currentVersion);
int ret = peer_->write(getResourceName());
@@ -175,7 +175,7 @@
return false;
}
- logger_->log_trace("Negotiate Codec version with destination port %s current version %d", port_id_str_, _currentCodecVersion);
+ logger_->log_trace("Negotiate Codec version with destination port %s current version %d", port_id_.to_string(), _currentCodecVersion);
int ret = peer_->write(getCodecResourceName());
@@ -233,8 +233,8 @@
logger_->log_error("Site2Site peer state is not established while handshake");
return false;
}
- logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_str_);
- _commsIdentifier = id_generator_->generate().to_string();
+ logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_.to_string());
+ _commsIdentifier = id_generator_->generate();
int ret = peer_->write(_commsIdentifier);
@@ -244,7 +244,7 @@
std::map<std::string, std::string> properties;
properties[HandShakePropertyStr[GZIP]] = "false";
- properties[HandShakePropertyStr[PORT_IDENTIFIER]] = port_id_str_;
+ properties[HandShakePropertyStr[PORT_IDENTIFIER]] = port_id_.to_string();
properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(_timeOut);
if (_currentVersion >= 5) {
if (_batchCount > 0)
@@ -314,7 +314,7 @@
}
// All known error cases handled here
- logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_str_, error);
+ logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error);
ret = -1;
return false;
}
@@ -425,7 +425,7 @@
return false;
}
- logger_->log_trace("Site2Site Protocol Negotiate Codec with destination port %s", port_id_str_);
+ logger_->log_trace("Site2Site Protocol Negotiate Codec with destination port %s", port_id_.to_string());
int status = writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
@@ -463,7 +463,7 @@
}
}
-std::shared_ptr<Transaction> RawSiteToSiteClient::createTransaction(std::string &transactionID, TransferDirection direction) {
+std::shared_ptr<Transaction> RawSiteToSiteClient::createTransaction(TransferDirection direction) {
int ret;
bool dataAvailable;
std::shared_ptr<Transaction> transaction = nullptr;
@@ -498,8 +498,7 @@
dataAvailable = true;
logger_->log_trace("Site2Site peer indicates that data is available");
transaction = std::make_shared<Transaction>(direction, std::move(crcstream));
- known_transactions_[transaction->getUUIDStr()] = transaction;
- transactionID = transaction->getUUIDStr();
+ known_transactions_[transaction->getUUID()] = transaction;
transaction->setDataAvailable(dataAvailable);
logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
return transaction;
@@ -507,8 +506,7 @@
dataAvailable = false;
logger_->log_trace("Site2Site peer indicates that no data is available");
transaction = std::make_shared<Transaction>(direction, std::move(crcstream));
- known_transactions_[transaction->getUUIDStr()] = transaction;
- transactionID = transaction->getUUIDStr();
+ known_transactions_[transaction->getUUID()] = transaction;
transaction->setDataAvailable(dataAvailable);
logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
return transaction;
@@ -524,8 +522,7 @@
} else {
org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> crcstream(gsl::make_not_null(peer_.get()));
transaction = std::make_shared<Transaction>(direction, std::move(crcstream));
- known_transactions_[transaction->getUUIDStr()] = transaction;
- transactionID = transaction->getUUIDStr();
+ known_transactions_[transaction->getUUID()] = transaction;
logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
return transaction;
}
@@ -552,8 +549,7 @@
}
// Create the transaction
- std::string transactionID;
- transaction = createTransaction(transactionID, SEND);
+ transaction = createTransaction(SEND);
if (transaction == NULL) {
context->yield();
@@ -561,22 +557,25 @@
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
+ utils::Identifier transactionID = transaction->getUUID();
+
try {
DataPacket packet(getLogger(), transaction, attributes, payload);
int16_t resp = send(transactionID, &packet, nullptr, session);
if (resp == -1) {
- throw Exception(SITE2SITE_EXCEPTION, "Send Failed in transaction " + transactionID);
+ throw Exception(SITE2SITE_EXCEPTION, "Send Failed in transaction " + transactionID.to_string());
}
- logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID << " sent bytes length" << payload.length();
+ logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID.to_string() << " sent bytes length" << payload.length();
if (!confirm(transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " + transactionID);
+ throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " + transactionID.to_string());
}
if (!complete(transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " + transactionID);
+ throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " + transactionID.to_string());
}
- logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID << " successfully send flow record " << transaction->current_transfers_ << " content bytes " << transaction->_bytes;
+ logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID.to_string()
+ << " successfully send flow record " << transaction->current_transfers_ << " content bytes " << transaction->_bytes;
} catch (std::exception &exception) {
if (transaction)
deleteTransaction(transactionID);
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index d425b87..ae6e1f9 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -63,10 +63,10 @@
return 3 + message.size();
}
-void SiteToSiteClient::deleteTransaction(std::string transactionID) {
+void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) {
std::shared_ptr<Transaction> transaction = NULL;
- std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+ auto it = this->known_transactions_.find(transactionID);
if (it == known_transactions_.end()) {
return;
@@ -129,13 +129,13 @@
}
// Create the transaction
- std::string transactionID;
- transaction = createTransaction(transactionID, SEND);
+ transaction = createTransaction(SEND);
if (transaction == nullptr) {
context->yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
+ utils::Identifier transactionID = transaction->getUUID();
bool continueTransaction = true;
uint64_t startSendingNanos = utils::timeutils::getTimeNano();
@@ -151,7 +151,7 @@
throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
}
- logger_->log_debug("Site2Site transaction %s send flow record %s", transactionID, flow->getUUIDStr());
+ logger_->log_debug("Site2Site transaction %s send flow record %s", transactionID.to_string(), flow->getUUIDStr());
if (resp == 0) {
uint64_t endTime = utils::timeutils::getTimeMillis();
std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
@@ -172,12 +172,12 @@
} // while true
if (!confirm(transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + transactionID);
+ throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + transactionID.to_string());
}
if (!complete(transactionID)) {
- throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + transactionID);
+ throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + transactionID.to_string());
}
- logger_->log_debug("Site2Site transaction %s successfully sent flow record %d, content bytes %llu", transactionID, transaction->total_transfers_, transaction->_bytes);
+ logger_->log_debug("Site2Site transaction %s successfully sent flow record %d, content bytes %llu", transactionID.to_string(), transaction->total_transfers_, transaction->_bytes);
} catch (std::exception &exception) {
if (transaction)
deleteTransaction(transactionID);
@@ -199,7 +199,7 @@
return true;
}
-bool SiteToSiteClient::confirm(std::string transactionID) {
+bool SiteToSiteClient::confirm(const utils::Identifier& transactionID) {
int ret;
std::shared_ptr<Transaction> transaction = NULL;
@@ -211,7 +211,7 @@
return false;
}
- std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+ auto it = this->known_transactions_.find(transactionID);
if (it == known_transactions_.end()) {
return false;
@@ -242,7 +242,7 @@
// time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
uint64_t crcValue = transaction->getCRC();
std::string crc = std::to_string(crcValue);
- logger_->log_debug("Site2Site Receive confirm with CRC %llu to transaction %s", crcValue, transactionID);
+ logger_->log_debug("Site2Site Receive confirm with CRC %llu to transaction %s", crcValue, transactionID.to_string());
ret = writeResponse(transaction, CONFIRM_TRANSACTION, crc);
if (ret <= 0)
return false;
@@ -253,18 +253,18 @@
return false;
if (code == CONFIRM_TRANSACTION) {
- logger_->log_debug("Site2Site transaction %s peer confirm transaction", transactionID);
+ logger_->log_debug("Site2Site transaction %s peer confirm transaction", transactionID.to_string());
transaction->_state = TRANSACTION_CONFIRMED;
return true;
} else if (code == BAD_CHECKSUM) {
- logger_->log_debug("Site2Site transaction %s peer indicate bad checksum", transactionID);
+ logger_->log_debug("Site2Site transaction %s peer indicate bad checksum", transactionID.to_string());
return false;
} else {
- logger_->log_debug("Site2Site transaction %s peer unknown response code %d", transactionID, code);
+ logger_->log_debug("Site2Site transaction %s peer unknown response code %d", transactionID.to_string(), code);
return false;
}
} else {
- logger_->log_debug("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID);
+ logger_->log_debug("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID.to_string());
ret = writeResponse(transaction, FINISH_TRANSACTION, "FINISH_TRANSACTION");
if (ret <= 0) {
return false;
@@ -275,19 +275,19 @@
// we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
if (code == CONFIRM_TRANSACTION) {
- logger_->log_debug("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID, message);
+ logger_->log_debug("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.to_string(), message);
if (this->_currentVersion > 3) {
uint64_t crcValue = transaction->getCRC();
std::string crc = std::to_string(crcValue);
if (message == crc) {
- logger_->log_debug("Site2Site transaction %s CRC matched", transactionID);
+ logger_->log_debug("Site2Site transaction %s CRC matched", transactionID.to_string());
ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
if (ret <= 0)
return false;
transaction->_state = TRANSACTION_CONFIRMED;
return true;
} else {
- logger_->log_debug("Site2Site transaction %s CRC not matched %s", transactionID, crc);
+ logger_->log_debug("Site2Site transaction %s CRC not matched %s", transactionID.to_string(), crc);
ret = writeResponse(transaction, BAD_CHECKSUM, "BAD_CHECKSUM");
return false;
}
@@ -298,21 +298,21 @@
transaction->_state = TRANSACTION_CONFIRMED;
return true;
} else {
- logger_->log_debug("Site2Site transaction %s peer unknown respond code %d", transactionID, code);
+ logger_->log_debug("Site2Site transaction %s peer unknown respond code %d", transactionID.to_string(), code);
return false;
}
return false;
}
}
-void SiteToSiteClient::cancel(std::string transactionID) {
+void SiteToSiteClient::cancel(const utils::Identifier& transactionID) {
std::shared_ptr<Transaction> transaction = NULL;
if (peer_state_ != READY) {
return;
}
- std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+ auto it = this->known_transactions_.find(transactionID);
if (it == known_transactions_.end()) {
return;
@@ -331,10 +331,10 @@
return;
}
-void SiteToSiteClient::error(std::string transactionID) {
+void SiteToSiteClient::error(const utils::Identifier& transactionID) {
std::shared_ptr<Transaction> transaction = NULL;
- std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+ auto it = this->known_transactions_.find(transactionID);
if (it == known_transactions_.end()) {
return;
@@ -348,7 +348,7 @@
}
// Complete the transaction
-bool SiteToSiteClient::complete(std::string transactionID) {
+bool SiteToSiteClient::complete(const utils::Identifier& transactionID) {
int ret;
std::shared_ptr<Transaction> transaction = NULL;
@@ -376,7 +376,7 @@
transaction->_state = TRANSACTION_COMPLETED;
return true;
} else {
- logger_->log_debug("Site2Site transaction %s receive finished", transactionID);
+ logger_->log_debug("Site2Site transaction %s receive finished", transactionID.to_string());
ret = this->writeResponse(transaction, TRANSACTION_FINISHED, "Finished");
if (ret <= 0) {
return false;
@@ -396,17 +396,17 @@
return false;
if (code == TRANSACTION_FINISHED) {
- logger_->log_info("Site2Site transaction %s peer finished transaction", transactionID);
+ logger_->log_info("Site2Site transaction %s peer finished transaction", transactionID.to_string());
transaction->_state = TRANSACTION_COMPLETED;
return true;
} else {
- logger_->log_warn("Site2Site transaction %s peer unknown respond code %d", transactionID, code);
+ logger_->log_warn("Site2Site transaction %s peer unknown respond code %d", transactionID.to_string(), code);
return false;
}
}
}
-int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, const std::shared_ptr<core::FlowFile> &flowFile, const std::shared_ptr<core::ProcessSession> &session) {
+int16_t SiteToSiteClient::send(const utils::Identifier& transactionID, DataPacket *packet, const std::shared_ptr<core::FlowFile> &flowFile, const std::shared_ptr<core::ProcessSession> &session) {
int ret;
if (peer_state_ != READY) {
@@ -417,7 +417,7 @@
return -1;
}
- std::map<std::string, std::shared_ptr<Transaction> >::iterator it = this->known_transactions_.find(transactionID);
+ auto it = this->known_transactions_.find(transactionID);
if (it == known_transactions_.end()) {
return -1;
@@ -425,12 +425,12 @@
std::shared_ptr<Transaction> transaction = it->second;
if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
- logger_->log_warn("Site2Site transaction %s is not at started or exchanged state", transactionID);
+ logger_->log_warn("Site2Site transaction %s is not at started or exchanged state", transactionID.to_string());
return -1;
}
if (transaction->getDirection() != SEND) {
- logger_->log_warn("Site2Site transaction %s direction is wrong", transactionID);
+ logger_->log_warn("Site2Site transaction %s direction is wrong", transactionID.to_string());
return -1;
}
@@ -458,7 +458,7 @@
if (ret <= 0) {
return -1;
}
- logger_->log_debug("Site2Site transaction %s send attribute key %s value %s", transactionID, itAttribute->first, itAttribute->second);
+ logger_->log_debug("Site2Site transaction %s send attribute key %s value %s", transactionID.to_string(), itAttribute->first, itAttribute->second);
}
bool flowfile_has_content = (flowFile != nullptr);
@@ -518,13 +518,13 @@
transaction->_state = DATA_EXCHANGED;
transaction->_bytes += len;
- logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID << " sent flow " << transaction->total_transfers_
+ logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID.to_string() << " sent flow " << transaction->total_transfers_
<< "flow records, with total size " << transaction->_bytes;
return 0;
}
-bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bool &eof) {
+bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) {
int ret;
std::shared_ptr<Transaction> transaction = NULL;
@@ -545,12 +545,12 @@
transaction = it->second;
if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) {
- logger_->log_warn("Site2Site transaction %s is not at started or exchanged state", transactionID);
+ logger_->log_warn("Site2Site transaction %s is not at started or exchanged state", transactionID.to_string());
return false;
}
if (transaction->getDirection() != RECEIVE) {
- logger_->log_warn("Site2Site transaction %s direction is wrong", transactionID);
+ logger_->log_warn("Site2Site transaction %s direction is wrong", transactionID.to_string());
return false;
}
@@ -570,15 +570,15 @@
return false;
}
if (code == CONTINUE_TRANSACTION) {
- logger_->log_debug("Site2Site transaction %s peer indicate continue transaction", transactionID);
+ logger_->log_debug("Site2Site transaction %s peer indicate continue transaction", transactionID.to_string());
transaction->_dataAvailable = true;
} else if (code == FINISH_TRANSACTION) {
- logger_->log_debug("Site2Site transaction %s peer indicate finish transaction", transactionID);
+ logger_->log_debug("Site2Site transaction %s peer indicate finish transaction", transactionID.to_string());
transaction->_dataAvailable = false;
eof = true;
return true;
} else {
- logger_->log_debug("Site2Site transaction %s peer indicate wrong respond code %d", transactionID, code);
+ logger_->log_debug("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.to_string(), code);
return false;
}
}
@@ -597,7 +597,7 @@
}
// read the attributes
- logger_->log_debug("Site2Site transaction %s receives attribute key %d", transactionID, numAttributes);
+ logger_->log_debug("Site2Site transaction %s receives attribute key %d", transactionID.to_string(), numAttributes);
for (unsigned int i = 0; i < numAttributes; i++) {
std::string key;
std::string value;
@@ -610,7 +610,7 @@
return false;
}
packet->_attributes[key] = value;
- logger_->log_debug("Site2Site transaction %s receives attribute key %s value %s", transactionID, key, value);
+ logger_->log_debug("Site2Site transaction %s receives attribute key %s value %s", transactionID.to_string(), key, value);
}
uint64_t len;
@@ -624,14 +624,14 @@
transaction->current_transfers_++;
transaction->total_transfers_++;
} else {
- logger_->log_warn("Site2Site transaction %s empty flow file without attribute", transactionID);
+ logger_->log_warn("Site2Site transaction %s empty flow file without attribute", transactionID.to_string());
transaction->_dataAvailable = false;
eof = true;
return true;
}
transaction->_state = DATA_EXCHANGED;
transaction->_bytes += len;
- logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID << " received flow record " << transaction->total_transfers_
+ logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID.to_string() << " received flow record " << transaction->total_transfers_
<< ", total length " << transaction->_bytes << ", added " << len;
return true;
@@ -655,8 +655,7 @@
}
// Create the transaction
- std::string transactionID;
- transaction = createTransaction(transactionID, RECEIVE);
+ transaction = createTransaction(RECEIVE);
if (transaction == NULL) {
context->yield();
@@ -664,6 +663,8 @@
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
+ utils::Identifier transactionID = transaction->getUUID();
+
try {
while (true) {
std::map<std::string, std::string> empty;
@@ -673,7 +674,7 @@
bool eof = false;
if (!receive(transactionID, &packet, eof)) {
- throw Exception(SITE2SITE_EXCEPTION, "Receive Failed " + transactionID);
+ throw Exception(SITE2SITE_EXCEPTION, "Receive Failed " + transactionID.to_string());
}
if (eof) {
// transaction done
@@ -719,10 +720,10 @@
}
if (!complete(transactionID)) {
std::stringstream transaction_str;
- transaction_str << "Complete Transaction " << transactionID << " Failed";
+ transaction_str << "Complete Transaction " << transactionID.to_string() << " Failed";
throw Exception(SITE2SITE_EXCEPTION, transaction_str.str());
}
- logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID << " received flow record " << transfers
+ logging::LOG_INFO(logger_) << "Site to Site transaction " << transactionID.to_string() << " received flow record " << transfers
<< ", with content size " << bytes << " bytes";
// we yield the receive if we did not get anything
if (transfers == 0)
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
index 99f2912..1628f62 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -194,7 +194,7 @@
}
template<typename T>
-void ThreadPool<T>::stopTasks(const std::string &identifier) {
+void ThreadPool<T>::stopTasks(const TaskId &identifier) {
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
task_status_[identifier] = false;
}
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index cc4834e..541cbe3 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -78,7 +78,7 @@
processor->initialize();
processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
- processor_mapping_[processor->getUUIDStr()] = processor;
+ processor_mapping_[processor->getUUID()] = processor;
if (!linkToPrevious) {
termination_ = *(relationships.begin());
@@ -103,11 +103,8 @@
connection->setSource(last);
connection->setDestination(processor);
- utils::Identifier uuid_copy, uuid_copy_next;
- last->getUUID(uuid_copy);
- connection->setSourceUUID(uuid_copy);
- processor->getUUID(uuid_copy_next);
- connection->setDestinationUUID(uuid_copy_next);
+ connection->setSourceUUID(last->getUUID());
+ connection->setDestinationUUID(processor->getUUID());
last->addConnection(connection);
if (last != processor) {
processor->addConnection(connection);
@@ -172,11 +169,8 @@
connection->setSource(source_proc);
connection->setDestination(destination_proc);
- utils::Identifier uuid_copy_src, uuid_copy_dest;
- source_proc->getUUID(uuid_copy_src);
- connection->setSourceUUID(uuid_copy_src);
- destination_proc->getUUID(uuid_copy_dest);
- connection->setDestinationUUID(uuid_copy_dest);
+ connection->setSourceUUID(source_proc->getUUID());
+ connection->setDestinationUUID(destination_proc->getUUID());
source_proc->addConnection(connection);
if (source_proc != destination_proc) {
destination_proc->addConnection(connection);
@@ -335,8 +329,7 @@
if (setDest)
connection->setDestination(processor);
- utils::Identifier uuid_copy;
- last->getUUID(uuid_copy);
+ utils::Identifier uuid_copy = last->getUUID();
connection->setSourceUUID(uuid_copy);
if (setDest)
connection->setDestinationUUID(uuid_copy);
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 0e11aea..5ab5615 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -330,7 +330,7 @@
std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> controller_service_nodes_;
- std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+ std::map<utils::Identifier, std::shared_ptr<core::Processor>> processor_mapping_;
std::vector<std::shared_ptr<core::Processor>> processor_queue_;
std::vector<std::shared_ptr<core::Processor>> configured_processors_;
std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index cd9c08c..446b71b 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -121,8 +121,8 @@
processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
+ utils::Identifier processoruuid = processor->getUUID();
+ REQUIRE(processoruuid);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
diff --git a/libminifi/test/archive-tests/FocusArchiveTests.cpp b/libminifi/test/archive-tests/FocusArchiveTests.cpp
index 6e15d43..9435bf6 100644
--- a/libminifi/test/archive-tests/FocusArchiveTests.cpp
+++ b/libminifi/test/archive-tests/FocusArchiveTests.cpp
@@ -53,8 +53,7 @@
TestController testController;
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::UnfocusArchiveEntry>("processorname");
REQUIRE(processor->getName() == "processorname");
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
+ REQUIRE(processor->getUUID());
}
TEST_CASE("FocusArchive", "[testFocusArchive]") {
diff --git a/libminifi/test/archive-tests/ManipulateArchiveTests.cpp b/libminifi/test/archive-tests/ManipulateArchiveTests.cpp
index c74a173..0657c9f 100644
--- a/libminifi/test/archive-tests/ManipulateArchiveTests.cpp
+++ b/libminifi/test/archive-tests/ManipulateArchiveTests.cpp
@@ -107,8 +107,7 @@
TestController testController;
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ManipulateArchive>("processorname");
REQUIRE(processor->getName() == "processorname");
- utils::Identifier processoruuid;
- REQUIRE(true == processor->getUUID(processoruuid));
+ REQUIRE(processor->getUUID());
}
TEST_CASE("Test ManipulateArchive Touch", "[testManipulateArchiveTouch]") {
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 4f3bdc0..d8d2bed 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -154,11 +154,11 @@
processor = std::make_shared<processors::MergeContent>("mergecontent");
processor->initialize();
- utils::Identifier processoruuid;
- REQUIRE(processor->getUUID(processoruuid));
+ utils::Identifier processoruuid = processor->getUUID();
+ REQUIRE(processoruuid);
std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<minifi::processors::LogAttribute>("logattribute");
- utils::Identifier logAttributeuuid;
- REQUIRE(logAttributeProcessor->getUUID(logAttributeuuid));
+ utils::Identifier logAttributeuuid = logAttributeProcessor->getUUID();
+ REQUIRE(logAttributeuuid);
// output from merge processor to log attribute
output = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
diff --git a/libminifi/test/bustache-tests/ApplyTemplateTests.cpp b/libminifi/test/bustache-tests/ApplyTemplateTests.cpp
index 9899d13..93ff71e 100644
--- a/libminifi/test/bustache-tests/ApplyTemplateTests.cpp
+++ b/libminifi/test/bustache-tests/ApplyTemplateTests.cpp
@@ -49,8 +49,7 @@
TestController testController;
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ApplyTemplate>("processorname");
REQUIRE(processor->getName() == "processorname");
- utils::Identifier processoruuid;
- REQUIRE(processor->getUUID(processoruuid));
+ REQUIRE(processor->getUUID());
}
TEST_CASE("Test usage of ApplyTemplate", "[ApplyTemplateTest]") {
diff --git a/libminifi/test/flow-tests/SessionTests.cpp b/libminifi/test/flow-tests/SessionTests.cpp
index 4ad0ece..dbba9a9 100644
--- a/libminifi/test/flow-tests/SessionTests.cpp
+++ b/libminifi/test/flow-tests/SessionTests.cpp
@@ -58,8 +58,7 @@
content_repo->initialize(config);
auto processor = std::make_shared<core::Processor>("dummy");
- utils::Identifier uuid;
- processor->getUUID(uuid);
+ utils::Identifier uuid = processor->getUUID();
auto output = std::make_shared<minifi::Connection>(ff_repository, content_repo, "output");
output->addRelationship({"out", ""});
output->setSourceUUID(uuid);
diff --git a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
index 958e53b..61e6b63 100644
--- a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
+++ b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
@@ -39,7 +39,7 @@
TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
- std::string eventId = record1.getEventId();
+ utils::Identifier eventId = record1.getEventId();
std::string smileyface = ":)";
record1.setDetails(smileyface);
@@ -62,7 +62,7 @@
TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
- std::string eventId = record1.getEventId();
+ utils::Identifier eventId = record1.getEventId();
std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>();
ffr1->setAttribute("potato", "potatoe");
ffr1->setAttribute("tomato", "tomatoe");
@@ -79,8 +79,8 @@
REQUIRE(record2.DeSerialize(testRepository) == true);
REQUIRE(record1.getChildrenUuids().size() == 1);
REQUIRE(record2.getChildrenUuids().size() == 1);
- std::string childId = record2.getChildrenUuids().at(0);
- REQUIRE(childId == ffr1->getUUIDStr());
+ utils::Identifier childId = record2.getChildrenUuids().at(0);
+ REQUIRE(childId == ffr1->getUUID());
record2.removeChildUuid(childId);
REQUIRE(record2.getChildrenUuids().size() == 0);
}
@@ -88,7 +88,7 @@
TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
- std::string eventId = record1.getEventId();
+ utils::Identifier eventId = record1.getEventId();
std::string smileyface = ":)";
record1.setDetails(smileyface);
@@ -113,7 +113,7 @@
TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") {
provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
- std::string eventId = record1.getEventId();
+ utils::Identifier eventId = record1.getEventId();
std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>();
ffr1->setAttribute("potato", "potatoe");
ffr1->setAttribute("tomato", "tomatoe");
@@ -131,8 +131,8 @@
REQUIRE(record2.DeSerialize(testRepository) == true);
REQUIRE(record1.getChildrenUuids().size() == 1);
REQUIRE(record2.getChildrenUuids().size() == 1);
- std::string childId = record2.getChildrenUuids().at(0);
- REQUIRE(childId == ffr1->getUUIDStr());
+ utils::Identifier childId = record2.getChildrenUuids().at(0);
+ REQUIRE(childId == ffr1->getUUID());
record2.removeChildUuid(childId);
REQUIRE(record2.getChildrenUuids().size() == 0);
}
@@ -140,7 +140,7 @@
TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
- std::string eventId = record1.getEventId();
+ utils::Identifier eventId = record1.getEventId();
std::string smileyface = ":)";
record1.setDetails(smileyface);
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 9fbee8c..d2edaa3 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -292,8 +292,8 @@
*/
{
std::shared_ptr<core::Processor> processor = std::make_shared<core::Processor>("dummy");
- utils::Identifier uuid;
- REQUIRE(processor->getUUID(uuid));
+ utils::Identifier uuid = processor->getUUID();
+ REQUIRE(uuid);
input->setSourceUUID(uuid);
processor->addConnection(input);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp
index 188f515..504822b 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -38,8 +38,9 @@
return "TestStateController";
}
- virtual std::string getComponentUUID() const {
- return "uuid";
+ virtual utils::Identifier getComponentUUID() const {
+ static auto dummyUUID = utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
+ return dummyUUID;
}
/**
@@ -92,8 +93,9 @@
return "TestUpdateSink";
}
- virtual std::string getComponentUUID() const {
- return "uuid";
+ virtual utils::Identifier getComponentUUID() const {
+ static auto dummyUUID = utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
+ return dummyUUID;
}
/**
* Start the client
diff --git a/libminifi/test/unit/Site2SiteTests.cpp b/libminifi/test/unit/Site2SiteTests.cpp
index cd53528..1ad41c8 100644
--- a/libminifi/test/unit/Site2SiteTests.cpp
+++ b/libminifi/test/unit/Site2SiteTests.cpp
@@ -36,36 +36,11 @@
minifi::sitetosite::RawSiteToSiteClient protocol(std::move(peer));
- std::string uuid_str = "c56a4180-65aa-42ec-a945-5fd21dec0538";
-
- utils::Identifier fakeUUID;
-
- fakeUUID = uuid_str;
+ utils::Identifier fakeUUID = utils::Identifier::parse("c56a4180-65aa-42ec-a945-5fd21dec0538").value();
protocol.setPortId(fakeUUID);
- REQUIRE(uuid_str == protocol.getPortId());
-}
-
-TEST_CASE("TestSetPortIdUppercase", "[S2S2]") {
- std::unique_ptr<minifi::sitetosite::SiteToSitePeer> peer = std::unique_ptr<minifi::sitetosite::SiteToSitePeer>(
- new minifi::sitetosite::SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::BufferStream>(new org::apache::nifi::minifi::io::BufferStream()), "fake_host", 65433, ""));
-
- minifi::sitetosite::RawSiteToSiteClient protocol(std::move(peer));
-
- std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
-
- utils::Identifier fakeUUID;
-
- fakeUUID = uuid_str;
-
- protocol.setPortId(fakeUUID);
-
- REQUIRE(uuid_str != protocol.getPortId());
-
- std::transform(uuid_str.begin(), uuid_str.end(), uuid_str.begin(), ::tolower);
-
- REQUIRE(uuid_str == protocol.getPortId());
+ REQUIRE(fakeUUID == protocol.getPortId());
}
void sunny_path_bootstrap(SiteToSiteResponder *collector) {
@@ -98,11 +73,7 @@
minifi::sitetosite::RawSiteToSiteClient protocol(std::move(peer));
- std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
-
- utils::Identifier fakeUUID;
-
- fakeUUID = uuid_str;
+ utils::Identifier fakeUUID = utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
protocol.setPortId(fakeUUID);
@@ -137,10 +108,11 @@
// start to send the stuff
// Create the transaction
- std::string transactionID;
std::string payload = "Test MiNiFi payload";
std::shared_ptr<minifi::sitetosite::Transaction> transaction;
- transaction = protocol.createTransaction(transactionID, minifi::sitetosite::SEND);
+ transaction = protocol.createTransaction(minifi::sitetosite::SEND);
+ REQUIRE(transaction);
+ auto transactionID = transaction->getUUID();
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "SEND_FLOWFILES");
std::map<std::string, std::string> attributes;
diff --git a/nanofi/include/cxx/Plan.h b/nanofi/include/cxx/Plan.h
index e75b7e9..ecdede6 100644
--- a/nanofi/include/cxx/Plan.h
+++ b/nanofi/include/cxx/Plan.h
@@ -108,7 +108,7 @@
std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
bool linkToPrevious = false);
- bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);
+ bool setProperty(const std::shared_ptr<core::Processor>& proc, const std::string &prop, const std::string &value);
void reset();
@@ -141,7 +141,7 @@
}
void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){
- next_ff_ = ptr;
+ next_ff_ = std::move(ptr);
}
bool hasProcessor() {
@@ -154,16 +154,16 @@
std::function<void(core::ProcessSession*, core::ProcessContext *)> ontrigger_callback,
std::function<void(core::ProcessContext *)> onschedule_callback = {});
- static std::shared_ptr<ExecutionPlan> getPlan(const std::string& uuid) {
+ static std::shared_ptr<ExecutionPlan> getPlan(const utils::Identifier& uuid) {
auto it = proc_plan_map_.find(uuid);
return it != proc_plan_map_.end() ? it->second : nullptr;
}
- static void addProcessorWithPlan(const std::string &uuid, std::shared_ptr<ExecutionPlan> plan) {
- proc_plan_map_[uuid] = plan;
+ static void addProcessorWithPlan(const utils::Identifier &uuid, std::shared_ptr<ExecutionPlan> plan) {
+ proc_plan_map_[uuid] = std::move(plan);
}
- static bool removeProcWithPlan(const std::string& uuid) {
+ static bool removeProcWithPlan(const utils::Identifier& uuid) {
return proc_plan_map_.erase(uuid) > 0;
}
@@ -221,7 +221,7 @@
std::shared_ptr<core::ProcessSession> current_session_;
std::shared_ptr<core::FlowFile> current_flowfile_;
- std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
+ std::map<utils::Identifier, std::shared_ptr<core::Processor>> processor_mapping_;
std::vector<std::shared_ptr<core::Processor>> processor_queue_;
std::vector<std::shared_ptr<core::Processor>> configured_processors_;
std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
@@ -238,7 +238,7 @@
static std::shared_ptr<utils::IdGenerator> id_generator_;
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<FailureHandler> failure_handler_;
- static std::unordered_map<std::string, std::shared_ptr<ExecutionPlan>> proc_plan_map_;
+ static std::map<utils::Identifier, std::shared_ptr<ExecutionPlan>> proc_plan_map_;
static std::map<std::string, custom_processor_args> custom_processors;
};
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index a53a27f..326759d 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -139,14 +139,14 @@
}
if (instance == NULL) {
nifi_port port;
- std::string port_str = utils::IdGenerator::getIdGenerator()->generate().to_string();
+ auto port_str = utils::IdGenerator::getIdGenerator()->generate().to_string();
port.port_id = const_cast<char*>(port_str.c_str());
instance = create_instance("internal_standalone", &port);
}
auto flow = create_new_flow(instance);
std::shared_ptr<ExecutionPlan> plan(flow);
plan->addProcessor(ptr, name);
- ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan);
+ ExecutionPlan::addProcessorWithPlan(ptr->getUUID(), plan);
return static_cast<standalone_processor*>(ptr.get());
}
@@ -180,7 +180,7 @@
void free_standalone_processor(standalone_processor* proc) {
NULL_CHECK(, proc);
- ExecutionPlan::removeProcWithPlan(proc->getUUIDStr());
+ ExecutionPlan::removeProcWithPlan(proc->getUUID());
if (ExecutionPlan::getProcWithPlanQty() == 0) {
// The instance is not needed any more as there are no standalone processors in the system
@@ -288,7 +288,7 @@
}
ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(ctx->getContentRepository()));
- auto plan = ExecutionPlan::getPlan(ctx->getProcessorNode()->getProcessor()->getUUIDStr());
+ auto plan = ExecutionPlan::getPlan(ctx->getProcessorNode()->getProcessor()->getUUID());
if (!plan) {
return nullptr;
@@ -690,7 +690,7 @@
if (proc == nullptr) {
return nullptr;
}
- auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
+ auto plan = ExecutionPlan::getPlan(proc->getUUID());
if (!plan) {
// This is not a standalone processor, shouldn't be used with invoke!
return nullptr;
@@ -726,7 +726,7 @@
return nullptr;
}
- auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
+ auto plan = ExecutionPlan::getPlan(proc->getUUID());
if (!plan) {
// This is not a standalone processor, shouldn't be used with invoke!
return nullptr;
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
index 56d77b1..9ccfcc8 100644
--- a/nanofi/src/cxx/Plan.cpp
+++ b/nanofi/src/cxx/Plan.cpp
@@ -24,7 +24,7 @@
#include <string>
std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator();
-std::unordered_map<std::string, std::shared_ptr<ExecutionPlan>> ExecutionPlan::proc_plan_map_ = {};
+std::map<utils::Identifier, std::shared_ptr<ExecutionPlan>> ExecutionPlan::proc_plan_map_ = {};
std::map<std::string, custom_processor_args> ExecutionPlan::custom_processors = {};
ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo)
@@ -68,7 +68,7 @@
return addProcessor(proc, CallbackProcessorName, core::Relationship("success", "description"), true);
}
-bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) {
+bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor>& proc, const std::string &prop, const std::string &value) {
uint32_t i = 0;
logger_->log_debug("Attempting to set property %s %s for %s", prop, value, proc->getName());
for (i = 0; i < processor_queue_.size(); i++) {
@@ -95,7 +95,7 @@
// initialize the processor
processor->initialize();
- processor_mapping_[processor->getUUIDStr()] = processor;
+ processor_mapping_[processor->getUUID()] = processor;
if (!linkToPrevious) {
termination_ = relationship;
@@ -303,13 +303,10 @@
// link the connections so that we can test results at the end for this
connection->setSource(src_proc);
- utils::Identifier uuid_copy, uuid_copy_next;
- src_proc->getUUID(uuid_copy);
- connection->setSourceUUID(uuid_copy);
+ connection->setSourceUUID(src_proc->getUUID());
if (set_dst) {
connection->setDestination(dst_proc);
- dst_proc->getUUID(uuid_copy_next);
- connection->setDestinationUUID(uuid_copy_next);
+ connection->setDestinationUUID(dst_proc->getUUID());
if (src_proc != dst_proc) {
dst_proc->addConnection(connection);
}
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
index ad8f594..4edde9b 100644
--- a/nanofi/tests/CAPITests.cpp
+++ b/nanofi/tests/CAPITests.cpp
@@ -27,7 +27,7 @@
static nifi_instance *create_instance_obj(const char *name = "random_instance") {
nifi_port port;
- std::string port_str = utils::IdGenerator::getIdGenerator()->generate().to_string();
+ auto port_str = utils::IdGenerator::getIdGenerator()->generate().to_string();
port.port_id = const_cast<char*>(port_str.c_str());
return create_instance_repo("random_instance", &port, "volatilerepository");
}
diff --git a/nanofi/tests/CTestsBase.h b/nanofi/tests/CTestsBase.h
index ac9cddc..b3c0767 100644
--- a/nanofi/tests/CTestsBase.h
+++ b/nanofi/tests/CTestsBase.h
@@ -89,7 +89,7 @@
class TailFileTestResourceManager {
public:
TailFileTestResourceManager(const std::string& processor_name, void(*callback)(processor_session * ps, processor_context * ctx)) {
- std::string port_str = utils::IdGenerator::getIdGenerator()->generate().to_string();
+ auto port_str = utils::IdGenerator::getIdGenerator()->generate().to_string();
nifi_port port;
port.port_id = const_cast<char*>(port_str.c_str());
const char * instance_str = "nifi";