MINIFICPP-1300 - Extract conditional reload for the schedulers of the FlowController
MINIFICPP-1300 - Decouple the ownership models of components using a ControllerServiceProvider (eg. FlowController)
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #858
diff --git a/extensions/coap/protocols/CoapC2Protocol.cpp b/extensions/coap/protocols/CoapC2Protocol.cpp
index b9d195e..f51d5a3 100644
--- a/extensions/coap/protocols/CoapC2Protocol.cpp
+++ b/extensions/coap/protocols/CoapC2Protocol.cpp
@@ -39,7 +39,7 @@
CoapProtocol::~CoapProtocol() = default;
-void CoapProtocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+void CoapProtocol::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) {
RESTSender::initialize(controller, configure);
if (configure->get("nifi.c2.coap.connector.service", controller_service_name_)) {
auto service = controller->getControllerService(controller_service_name_);
diff --git a/extensions/coap/protocols/CoapC2Protocol.h b/extensions/coap/protocols/CoapC2Protocol.h
index a2a702c..ce74f35 100644
--- a/extensions/coap/protocols/CoapC2Protocol.h
+++ b/extensions/coap/protocols/CoapC2Protocol.h
@@ -84,7 +84,7 @@
// no op.
}
- void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+ void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) override;
// Supported Properties
diff --git a/extensions/expression-language/ProcessContextExpr.h b/extensions/expression-language/ProcessContextExpr.h
index 4d0db54..191859f 100644
--- a/extensions/expression-language/ProcessContextExpr.h
+++ b/extensions/expression-language/ProcessContextExpr.h
@@ -37,14 +37,14 @@
/**
std::forward of argument list did not work on all platform.
**/
- ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider,
+ ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor, controller::ControllerServiceProvider* controller_service_provider,
const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_repo,
const std::shared_ptr<core::ContentRepository> &content_repo = std::make_shared<core::repository::FileSystemRepository>())
: core::ProcessContext(processor, controller_service_provider, repo, flow_repo, content_repo),
logger_(logging::LoggerFactory<ProcessContextExpr>::getLogger()) {
}
- ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider,
+ ProcessContextExpr(const std::shared_ptr<ProcessorNode> &processor, controller::ControllerServiceProvider* controller_service_provider,
const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_repo, const std::shared_ptr<minifi::Configure> &configuration,
const std::shared_ptr<core::ContentRepository> &content_repo = std::make_shared<core::repository::FileSystemRepository>())
: core::ProcessContext(processor, controller_service_provider, repo, flow_repo, configuration, content_repo),
diff --git a/extensions/http-curl/protocols/AgentPrinter.cpp b/extensions/http-curl/protocols/AgentPrinter.cpp
index b22331e..672652a 100644
--- a/extensions/http-curl/protocols/AgentPrinter.cpp
+++ b/extensions/http-curl/protocols/AgentPrinter.cpp
@@ -36,7 +36,7 @@
logger_(logging::LoggerFactory<AgentPrinter>::getLogger()) {
}
-void AgentPrinter::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+void AgentPrinter::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure) {
HeartBeatReporter::initialize(controller, updateSink, configure);
}
diff --git a/extensions/http-curl/protocols/AgentPrinter.h b/extensions/http-curl/protocols/AgentPrinter.h
index 50000fe..83c8353 100644
--- a/extensions/http-curl/protocols/AgentPrinter.h
+++ b/extensions/http-curl/protocols/AgentPrinter.h
@@ -45,7 +45,7 @@
/**
* Initialize agent printer.
*/
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+ virtual void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure) override;
/**
diff --git a/extensions/http-curl/protocols/RESTReceiver.cpp b/extensions/http-curl/protocols/RESTReceiver.cpp
index ae1ebf6..10557e7 100644
--- a/extensions/http-curl/protocols/RESTReceiver.cpp
+++ b/extensions/http-curl/protocols/RESTReceiver.cpp
@@ -44,7 +44,7 @@
logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) {
}
-void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure) {
+void RESTReceiver::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure) {
HeartBeatReporter::initialize(controller, updateSink, configure);
logger_->log_trace("Initializing rest receiver");
if (nullptr != configuration_) {
diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h
index 9c78a83..4bd6d51 100644
--- a/extensions/http-curl/protocols/RESTReceiver.h
+++ b/extensions/http-curl/protocols/RESTReceiver.h
@@ -48,7 +48,7 @@
public:
RESTReceiver(std::string name, utils::Identifier uuid = utils::Identifier());
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+ virtual void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure) override;
virtual int16_t heartbeat(const C2Payload &heartbeat) override;
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index 160f048..649910e 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -41,7 +41,7 @@
logger_(logging::LoggerFactory<Connectable>::getLogger()) {
}
-void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+void RESTSender::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) {
C2Protocol::initialize(controller, configure);
// base URL when one is not specified.
if (nullptr != configure) {
diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h
index 40a81f2..211716c 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -56,7 +56,7 @@
virtual void update(const std::shared_ptr<Configure> &configure) override;
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+ virtual void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) override;
protected:
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 48e6710..93e1091 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -89,9 +89,8 @@
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp);
std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686");
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
@@ -211,9 +210,8 @@
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp);
std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680");
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest");
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.cpp b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
index 9637e20..2dcdf68 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.cpp
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.cpp
@@ -30,7 +30,7 @@
MQTTC2Protocol::~MQTTC2Protocol() = default;
-void MQTTC2Protocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+void MQTTC2Protocol::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) {
if (configure->get("nifi.c2.mqtt.connector.service", controller_service_name_)) {
auto service = controller->getControllerService(controller_service_name_);
mqtt_service_ = std::static_pointer_cast<controllers::MQTTControllerService>(service);
diff --git a/extensions/mqtt/protocol/MQTTC2Protocol.h b/extensions/mqtt/protocol/MQTTC2Protocol.h
index 47e04a6..1497b10 100644
--- a/extensions/mqtt/protocol/MQTTC2Protocol.h
+++ b/extensions/mqtt/protocol/MQTTC2Protocol.h
@@ -64,7 +64,7 @@
// no op.
}
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override;
+ virtual void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) override;
protected:
diff --git a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
index 138e610..fa597ce 100644
--- a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
+++ b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
@@ -82,16 +82,14 @@
std::vector<std::thread> processor_workers;
std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> contextset = std::make_shared<core::ProcessContext>(node2, controller_services_provider, test_repo, test_repo);
+ std::shared_ptr<core::ProcessContext> contextset = std::make_shared<core::ProcessContext>(node2, nullptr, test_repo, test_repo);
core::ProcessSessionFactory factory(contextset);
processor->onSchedule(contextset.get(), &factory);
for (int i = 0; i < 1; i++) {
processor_workers.push_back(std::thread([processor, test_repo, &is_ready]() {
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, test_repo, test_repo);
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, test_repo, test_repo);
context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command, "sleep 0.5");
std::shared_ptr<core::ProcessSession> session = std::make_shared<core::ProcessSession>(context);
while (!is_ready.load(std::memory_order_relaxed)) {
diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
index 10d5f77..5f86a51 100644
--- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp
@@ -95,9 +95,8 @@
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(logAttribute);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + std::to_string(server.getPort()));
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "100 msec");
auto session = std::make_shared<core::ProcessSession>(context);
@@ -206,9 +205,8 @@
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(logAttribute);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + std::to_string(server.getPort()));
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "100 msec");
// we're using new lines above
@@ -329,9 +327,8 @@
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(logAttribute);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
- std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+ std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::EndpointList, org::apache::nifi::minifi::io::Socket::getMyHostName() + ":" + std::to_string(server.getPort()));
context->setProperty(org::apache::nifi::minifi::processors::GetTCP::ReconnectInterval, "100 msec");
// we're using new lines above
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index a251b4e..a99d8b3 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -83,8 +83,7 @@
REQUIRE(!dir.empty());
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
// replicate 10 threads
@@ -169,8 +168,7 @@
REQUIRE(!dir.empty());
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
context->setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir);
// replicate 10 threads
@@ -265,8 +263,7 @@
processor->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -574,8 +571,7 @@
REQUIRE(rpg->setProperty(minifi::RemoteProcessorGroupPort::hostName, host));
rpg->setProperty(minifi::RemoteProcessorGroupPort::port, port);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(rpg);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
auto psf = std::make_shared<core::ProcessSessionFactory>(context);
if (hasException) {
auto expected_error = "Site2Site Protocol: HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule";
diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h
index 31ac37a..3f1e7e0 100644
--- a/libminifi/include/CronDrivenSchedulingAgent.h
+++ b/libminifi/include/CronDrivenSchedulingAgent.h
@@ -44,7 +44,7 @@
/*!
* Create a new event driven scheduling agent.
*/
- CronDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
+ CronDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h
index 65d25bb..d56e929 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -43,7 +43,7 @@
/*!
* Create a new event driven scheduling agent.
*/
- EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
+ EventDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) {
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index c40c573..cd1804e 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -235,7 +235,7 @@
* @param id service identifier
* @return shared pointer to the controller service node or nullptr if it does not exist.
*/
- std::shared_ptr<core::controller::ControllerServiceNode> getControllerServiceNode(const std::string &id) override;
+ std::shared_ptr<core::controller::ControllerServiceNode> getControllerServiceNode(const std::string &id) const override;
void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override;
@@ -337,6 +337,15 @@
utils::optional<std::chrono::milliseconds> loadShutdownTimeoutFromConfiguration();
+ private:
+ template <typename T, typename = typename std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type>
+ void conditionalReloadScheduler(std::shared_ptr<T>& scheduler, const bool condition) {
+ if (condition) {
+ scheduler = std::make_shared<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
+ }
+ }
+
+ protected:
// flow controller mutex
std::recursive_mutex mutex_;
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 088c3e1..fb5532d 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -59,7 +59,7 @@
/*!
* Create a new scheduling agent.
*/
- SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
+ SchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: admin_yield_duration_(),
bored_yield_duration_(0),
@@ -135,7 +135,7 @@
// thread pool for components.
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_;
// controller service provider reference
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider_;
+ gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider_;
private:
struct SchedulingInfo {
diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h
index 8a35580..2e8edda 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -47,8 +47,9 @@
/*!
* Create a new threaded scheduling agent.
*/
- ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo,
- std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
+ ThreadedSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
+ std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo,
+ std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool),
logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
}
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h
index 160db1d..1481c18 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -39,7 +39,7 @@
/*!
* Create a new processor
*/
- TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo,
+ TimerDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo,
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure,
utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool)
: ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool),
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index 273fd43..edcf096 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -62,7 +62,7 @@
*/
class C2Agent : public state::UpdateController {
public:
- C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller,
+ C2Agent(core::controller::ControllerServiceProvider* controller,
const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure);
virtual ~C2Agent() noexcept {
@@ -210,7 +210,7 @@
std::shared_ptr<controllers::UpdatePolicyControllerService> update_service_;
// controller service provider reference.
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+ core::controller::ControllerServiceProvider* controller_;
// shared pointer to the configuration of this agent
std::shared_ptr<Configure> configuration_;
diff --git a/libminifi/include/c2/C2Protocol.h b/libminifi/include/c2/C2Protocol.h
index 2cbd0bd..23e7748 100644
--- a/libminifi/include/c2/C2Protocol.h
+++ b/libminifi/include/c2/C2Protocol.h
@@ -41,7 +41,7 @@
running_(true) {
}
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) {
+ virtual void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) {
controller_ = controller;
configuration_ = configure;
}
@@ -102,7 +102,7 @@
protected:
std::atomic<bool> running_;
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+ core::controller::ControllerServiceProvider* controller_;
std::shared_ptr<Configure> configuration_;
};
diff --git a/libminifi/include/c2/ControllerSocketProtocol.h b/libminifi/include/c2/ControllerSocketProtocol.h
index d24c8fd..1636477 100644
--- a/libminifi/include/c2/ControllerSocketProtocol.h
+++ b/libminifi/include/c2/ControllerSocketProtocol.h
@@ -51,7 +51,7 @@
* @param updateSink update mechanism that will be used to stop/clear elements
* @param configuration configuration class.
*/
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+ virtual void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configuration);
/**
diff --git a/libminifi/include/c2/HeartBeatReporter.h b/libminifi/include/c2/HeartBeatReporter.h
index 9de242d..82d5f2a 100644
--- a/libminifi/include/c2/HeartBeatReporter.h
+++ b/libminifi/include/c2/HeartBeatReporter.h
@@ -45,7 +45,7 @@
configuration_(nullptr) {
}
- virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+ virtual void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configure) {
controller_ = controller;
update_sink_ = updateSink;
@@ -89,7 +89,7 @@
}
protected:
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_;
+ core::controller::ControllerServiceProvider* controller_;
std::shared_ptr<state::StateMonitor> update_sink_;
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 53d42aa..3e0da1a 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -55,7 +55,7 @@
/*!
* Create a new process context associated with the processor/controller service/state manager
*/
- ProcessContext(const std::shared_ptr<ProcessorNode> &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, const std::shared_ptr<core::Repository> &repo,
+ ProcessContext(const std::shared_ptr<ProcessorNode> &processor, controller::ControllerServiceProvider* controller_service_provider, const std::shared_ptr<core::Repository> &repo,
const std::shared_ptr<core::Repository> &flow_repo, const std::shared_ptr<core::ContentRepository> &content_repo = std::make_shared<core::repository::FileSystemRepository>())
: VariableRegistry(std::make_shared<minifi::Configure>()),
controller_service_provider_(controller_service_provider),
@@ -73,7 +73,7 @@
/*!
* Create a new process context associated with the processor/controller service/state manager
*/
- ProcessContext(const std::shared_ptr<ProcessorNode> &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, const std::shared_ptr<core::Repository> &repo,
+ ProcessContext(const std::shared_ptr<ProcessorNode> &processor, controller::ControllerServiceProvider* controller_service_provider, const std::shared_ptr<core::Repository> &repo,
const std::shared_ptr<core::Repository> &flow_repo, const std::shared_ptr<minifi::Configure> &configuration, const std::shared_ptr<core::ContentRepository> &content_repo =
std::make_shared<core::repository::FileSystemRepository>())
: VariableRegistry(configuration),
@@ -172,9 +172,7 @@
* identifier
*/
std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) {
- if (controller_service_provider_ != nullptr)
- return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr());
- return nullptr;
+ return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr());
}
/**
@@ -228,7 +226,7 @@
}
static std::shared_ptr<core::CoreComponentStateManagerProvider> getOrCreateDefaultStateManagerProvider(
- std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider,
+ controller::ControllerServiceProvider* controller_service_provider,
std::shared_ptr<minifi::Configure> configuration,
const char *base_path = "") {
static std::mutex mutex;
@@ -300,7 +298,7 @@
static std::shared_ptr<core::CoreComponentStateManagerProvider> getStateManagerProvider(
std::shared_ptr<logging::Logger> logger,
- std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider,
+ controller::ControllerServiceProvider* controller_service_provider,
std::shared_ptr<minifi::Configure> configuration) {
if (controller_service_provider == nullptr) {
return nullptr;
@@ -330,7 +328,7 @@
}
// controller service provider.
- std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider_;
+ controller::ControllerServiceProvider* controller_service_provider_;
// state manager provider
std::shared_ptr<core::CoreComponentStateManagerProvider> state_manager_provider_;
// repository shared pointer.
diff --git a/libminifi/include/core/ProcessContextBuilder.h b/libminifi/include/core/ProcessContextBuilder.h
index 155ab41..281be79 100644
--- a/libminifi/include/core/ProcessContextBuilder.h
+++ b/libminifi/include/core/ProcessContextBuilder.h
@@ -62,7 +62,7 @@
virtual ~ProcessContextBuilder() = default;
- std::shared_ptr<ProcessContextBuilder> withProvider(const std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider);
+ std::shared_ptr<ProcessContextBuilder> withProvider(core::controller::ControllerServiceProvider* controller_service_provider);
std::shared_ptr<ProcessContextBuilder> withProvenanceRepository(const std::shared_ptr<core::Repository> &repo);
@@ -76,7 +76,7 @@
protected:
std::shared_ptr<minifi::Configure> configuration_;
- std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider_;
+ core::controller::ControllerServiceProvider* controller_service_provider_;
std::shared_ptr<core::Repository> prov_repo_;
std::shared_ptr<core::Repository> flow_repo_;
std::shared_ptr<core::ContentRepository> content_repo_;
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index 8628f94..428b977 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -82,7 +82,7 @@
* @param id controller service identifier.
* @return shared pointer to the controller service node.
*/
- virtual std::shared_ptr<ControllerServiceNode> getControllerServiceNode(const std::string &id) {
+ virtual std::shared_ptr<ControllerServiceNode> getControllerServiceNode(const std::string &id) const {
return controller_map_->getControllerServiceNode(id);
}
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index e088628..01c9a1e 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -82,10 +82,6 @@
flow_file_repo_(flow_file_repo),
controller_service_map_(std::make_shared<core::controller::ControllerServiceMap>()),
thread_pool_(2, false, nullptr, "Flowcontroller threadpool"),
- timer_scheduler_(nullptr),
- event_scheduler_(nullptr),
- cron_scheduler_(nullptr),
- controller_service_provider_(nullptr),
flow_configuration_(std::move(flow_configuration)),
configuration_(configure),
content_repo_(content_repo),
@@ -267,9 +263,9 @@
this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_);
}
// stop after we've attempted to stop the processors.
- this->timer_scheduler_->stop();
- this->event_scheduler_->stop();
- this->cron_scheduler_->stop();
+ timer_scheduler_->stop();
+ event_scheduler_->stop();
+ cron_scheduler_->stop();
thread_pool_.shutdown();
/* STOP! Before you change it, consider the following:
* -Stopping the schedulers doesn't actually quit the onTrigger functions of processors
@@ -350,17 +346,10 @@
thread_pool_.setControllerServiceProvider(base_shared_ptr);
thread_pool_.start();
}
- if (nullptr == timer_scheduler_ || reload) {
- timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
- }
- if (nullptr == event_scheduler_ || reload) {
- event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
- }
-
- if (nullptr == cron_scheduler_ || reload) {
- cron_scheduler_ = std::make_shared<CronDrivenSchedulingAgent>(base_shared_ptr, provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_);
- }
+ conditionalReloadScheduler<TimerDrivenSchedulingAgent>(timer_scheduler_, !timer_scheduler_ || reload);
+ conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_, !event_scheduler_ || reload);
+ conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_, !cron_scheduler_ || reload);
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
@@ -591,7 +580,7 @@
loadC2ResponseConfiguration();
if (!c2_initialized_) {
- c2_agent_ = std::unique_ptr<c2::C2Agent>(new c2::C2Agent(std::dynamic_pointer_cast<FlowController>(shared_from_this()),
+ c2_agent_ = std::unique_ptr<c2::C2Agent>(new c2::C2Agent(this,
std::dynamic_pointer_cast<FlowController>(shared_from_this()),
configuration_));
c2_agent_->start();
@@ -801,7 +790,7 @@
* @param id service identifier
* @return shared pointer to the controller service node or nullptr if it does not exist.
*/
-std::shared_ptr<core::controller::ControllerServiceNode> FlowController::getControllerServiceNode(const std::string &id) {
+std::shared_ptr<core::controller::ControllerServiceNode> FlowController::getControllerServiceNode(const std::string &id) const {
return controller_service_provider_->getControllerServiceNode(id);
}
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 0ea31d2..9c0085d 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -47,7 +47,7 @@
namespace minifi {
namespace c2 {
-C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller,
+C2Agent::C2Agent(core::controller::ControllerServiceProvider* controller,
const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configuration)
: heart_beat_period_(3000),
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 9b7162d..124490f 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -32,7 +32,7 @@
namespace minifi {
namespace c2 {
-void ControllerSocketProtocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+void ControllerSocketProtocol::initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configuration) {
HeartBeatReporter::initialize(controller, updateSink, configuration);
stream_factory_ = minifi::io::StreamFactory::getInstance(configuration);
diff --git a/libminifi/src/core/ProcessContextBuilder.cpp b/libminifi/src/core/ProcessContextBuilder.cpp
index b1b794c..2cba8b7 100644
--- a/libminifi/src/core/ProcessContextBuilder.cpp
+++ b/libminifi/src/core/ProcessContextBuilder.cpp
@@ -48,7 +48,7 @@
configuration_ = std::make_shared<minifi::Configure>();
}
-std::shared_ptr<ProcessContextBuilder> ProcessContextBuilder::withProvider(const std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider) {
+std::shared_ptr<ProcessContextBuilder> ProcessContextBuilder::withProvider(core::controller::ControllerServiceProvider* controller_service_provider) {
controller_service_provider_ = controller_service_provider;
return this->shared_from_this();
}
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 6f415eb..c897459 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -54,7 +54,7 @@
} else {
state_dir_ = state_dir;
}
- state_manager_provider_ = core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_, configuration_, state_dir_.c_str());
+ state_manager_provider_ = core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_.get(), configuration_, state_dir_.c_str());
}
TestPlan::~TestPlan() {
@@ -125,7 +125,7 @@
auto contextBuilder = core::ClassLoader::getDefaultClassLoader().instantiate<core::ProcessContextBuilder>("ProcessContextBuilder");
- contextBuilder = contextBuilder->withContentRepository(content_repo_)->withFlowFileRepository(flow_repo_)->withProvider(controller_services_provider_)->withProvenanceRepository(prov_repo_)->withConfiguration(configuration_);
+ contextBuilder = contextBuilder->withContentRepository(content_repo_)->withFlowFileRepository(flow_repo_)->withProvider(controller_services_provider_.get())->withProvenanceRepository(prov_repo_)->withConfiguration(configuration_);
auto context = contextBuilder->build(node);
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index bb31681..5944b41 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -144,8 +144,7 @@
processor_->setScheduledState(core::ScheduledState::RUNNING);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor_);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- context_ = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo);
+ context_ = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
}
public:
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 1809d7c..6c1ec65 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -185,8 +185,7 @@
logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
- context = std::make_shared<core::ProcessContext>(node, controller_service_provider, repo, repo, content_repo);
+ context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
}
~MergeTestController() = default;
std::shared_ptr<core::ProcessContext> context;
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 3a24eef..ce22ef2 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -118,7 +118,7 @@
auto controller_service_provider = yaml_ptr->getControllerServiceProvider();
char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
state_dir = utils::file::FileUtils::create_temp_directory(state_dir_name_template);
- core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration, state_dir.c_str());
+ core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(), configuration, state_dir.c_str());
std::shared_ptr<core::ProcessGroup> pg(yaml_config.getRoot(test_file_location));
queryRootProcessGroup(pg);
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 466110d..4844729 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -41,20 +41,18 @@
TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
const std::function<std::shared_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
: ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) {
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
-
// setup processor
{
processor = processorGenerator(mainProcUUID());
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- processorContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
+ processorContext = std::make_shared<core::ProcessContext>(node, nullptr, prov_repo, ff_repository, content_repo);
}
// setup INPUT processor
{
inputProcessor = std::make_shared<core::Processor>("source", inputProcUUID());
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(inputProcessor);
- inputContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo,
+ inputContext = std::make_shared<core::ProcessContext>(node, nullptr, prov_repo,
ff_repository, content_repo);
}
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index b48ebdd..aa2ad84 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -298,8 +298,7 @@
input->setSourceUUID(uuid);
processor->addConnection(input);
std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
- auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
+ auto context = std::make_shared<core::ProcessContext>(node, nullptr, prov_repo, ff_repository, content_repo);
core::ProcessSession sessionGenFlowFile(context);
std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
sessionGenFlowFile.importFrom(content, flow);
diff --git a/nanofi/include/cxx/C2CallbackAgent.h b/nanofi/include/cxx/C2CallbackAgent.h
index cc8a665..81198bb 100644
--- a/nanofi/include/cxx/C2CallbackAgent.h
+++ b/nanofi/include/cxx/C2CallbackAgent.h
@@ -47,7 +47,7 @@
public:
- explicit C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
+ explicit C2CallbackAgent(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
virtual ~C2CallbackAgent() = default;
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 0a594e4..c8347c7 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -101,13 +101,12 @@
}
void enableAsyncC2(C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
running_ = true;
if (server->type != C2_Server_Type::MQTT) {
configure_->set("c2.rest.url", server->url);
configure_->set("c2.rest.url.ack", server->ack_url);
}
- agent_ = std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, configure_);
+ agent_ = std::make_shared<c2::C2CallbackAgent>(nullptr, nullptr, configure_);
listener_thread_pool_.start();
registerUpdateListener(agent_, 1000);
agent_->setStopCallback(c1);
@@ -133,8 +132,7 @@
}
void transfer(const std::shared_ptr<FlowFileRecord> &ff, const std::shared_ptr<minifi::io::DataStream> &stream = nullptr) {
- std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr;
- auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, configure_, content_repo_);
+ auto processContext = std::make_shared<core::ProcessContext>(proc_node_, nullptr, no_op_repo_, no_op_repo_, configure_, content_repo_);
auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext);
rpg_->onSchedule(processContext, sessionFactory);
diff --git a/nanofi/src/cxx/C2CallbackAgent.cpp b/nanofi/src/cxx/C2CallbackAgent.cpp
index 3a2b0d1..b41a870 100644
--- a/nanofi/src/cxx/C2CallbackAgent.cpp
+++ b/nanofi/src/cxx/C2CallbackAgent.cpp
@@ -34,7 +34,7 @@
namespace minifi {
namespace c2 {
-C2CallbackAgent::C2CallbackAgent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+C2CallbackAgent::C2CallbackAgent(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<state::StateMonitor> &updateSink,
const std::shared_ptr<Configure> &configuration)
: C2Agent(controller, updateSink, configuration),
stop(nullptr),
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
index 72dc162..e6a0211 100644
--- a/nanofi/src/cxx/Plan.cpp
+++ b/nanofi/src/cxx/Plan.cpp
@@ -115,7 +115,7 @@
processor_nodes_.push_back(node);
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_.get(), prov_repo_, flow_repo_, content_repo_);
processor_contexts_.push_back(context);
processor_queue_.push_back(processor);
@@ -228,7 +228,7 @@
processor_nodes_.push_back(node);
- std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_);
+ std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_.get(), prov_repo_, flow_repo_, content_repo_);
processor_contexts_.push_back(context);
processor_queue_.push_back(failure_proc);