MINIFICPP-2764 Controller Service CPP API
diff --git a/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h new file mode 100644 index 0000000..b307643 --- /dev/null +++ b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h
@@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> + +#include "minifi-c.h" +#include "minifi-cpp/core/PropertyDefinition.h" +#include "nonstd/expected.hpp" + +namespace org::apache::nifi::minifi::api::core { + +class ControllerServiceContext { + public: + explicit ControllerServiceContext(MinifiControllerServiceContext* impl) : impl_(impl) {} + + nonstd::expected<std::string, std::error_code> getProperty(std::string_view name) const; + nonstd::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference) const { + return getProperty(property_reference.name); + } + + private: + MinifiControllerServiceContext* impl_; +}; + +} // namespace org::apache::nifi::minifi::api::core
diff --git a/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h new file mode 100644 index 0000000..77fe1e3 --- /dev/null +++ b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h
@@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <memory> +#include <string> + +#include "ControllerServiceContext.h" +#include "minifi-cpp/core/ControllerServiceMetadata.h" +#include "minifi-cpp/utils/Id.h" +#include "utils/SmallString.h" + +namespace org::apache::nifi::minifi::api { + +class Connection; + +namespace core { + +class ControllerServiceImpl { + public: + explicit ControllerServiceImpl(minifi::core::ControllerServiceMetadata metadata); + + ControllerServiceImpl(const ControllerServiceImpl&) = delete; + ControllerServiceImpl(ControllerServiceImpl&&) = delete; + ControllerServiceImpl& operator=(const ControllerServiceImpl&) = delete; + ControllerServiceImpl& operator=(ControllerServiceImpl&&) = delete; + + virtual ~ControllerServiceImpl(); + + MinifiStatus enable(ControllerServiceContext&); + void notifyStop(); + + [[nodiscard]] std::string getName() const; + [[nodiscard]] minifi::utils::Identifier getUUID() const; + [[nodiscard]] minifi::utils::SmallString<36> getUUIDStr() const; + + protected: + virtual MinifiStatus enableImpl(api::core::ControllerServiceContext&) = 0; + virtual void notifyStopImpl() {} + + minifi::core::ControllerServiceMetadata metadata_; + + std::shared_ptr<minifi::core::logging::Logger> logger_; +}; + +} // namespace core +} // namespace org::apache::nifi::minifi::api
diff --git a/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h b/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h index 50f5eba..ab82b94 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h +++ b/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h
@@ -34,6 +34,7 @@ nonstd::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference, const FlowFile* flow_file = nullptr) const { return getProperty(property_reference.name, flow_file); } + nonstd::expected<MinifiControllerService*, std::error_code> getControllerService(std::string_view controller_service_name, std::string_view controller_service_class) const; bool hasNonEmptyProperty(std::string_view name) const;
diff --git a/extension-framework/cpp-extension-lib/include/api/core/ProcessorImpl.h b/extension-framework/cpp-extension-lib/include/api/core/ProcessorImpl.h index 57e13e6..43200f5 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/ProcessorImpl.h +++ b/extension-framework/cpp-extension-lib/include/api/core/ProcessorImpl.h
@@ -17,9 +17,7 @@ #pragma once #include <atomic> -#include <condition_variable> #include <memory> -#include <mutex> #include <string> #include <vector> @@ -53,7 +51,7 @@ virtual ~ProcessorImpl(); - void setTriggerWhenEmpty(bool trigger_when_empty) { + void setTriggerWhenEmpty(const bool trigger_when_empty) { trigger_when_empty_ = trigger_when_empty; } @@ -74,8 +72,8 @@ static constexpr auto OutputAttributes = std::array<minifi::core::OutputAttributeReference, 0>{}; std::string getName() const; - utils::Identifier getUUID() const; - utils::SmallString<36> getUUIDStr() const; + minifi::utils::Identifier getUUID() const; + minifi::utils::SmallString<36> getUUIDStr() const; virtual PublishedMetrics calculateMetrics() const {return {};} @@ -89,9 +87,6 @@ std::atomic<bool> trigger_when_empty_; std::shared_ptr<minifi::core::logging::Logger> logger_; - - private: - mutable std::mutex mutex_; }; } // namespace core
diff --git a/extension-framework/cpp-extension-lib/include/api/core/Resource.h b/extension-framework/cpp-extension-lib/include/api/core/Resource.h index 292a996..c977733 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/Resource.h +++ b/extension-framework/cpp-extension-lib/include/api/core/Resource.h
@@ -26,14 +26,16 @@ #define WIN32_LEAN_AND_MEAN 1 #endif -#include "minifi-c.h" -#include "core/ClassName.h" -#include "api/utils/minifi-c-utils.h" +#include "ControllerServiceContext.h" +#include "FlowFile.h" #include "ProcessContext.h" #include "ProcessSession.h" -#include "FlowFile.h" -#include "minifi-cpp/core/ProcessorMetadata.h" +#include "api/utils/minifi-c-utils.h" +#include "core/ClassName.h" #include "logging/Logger.h" +#include "minifi-c.h" +#include "minifi-cpp/core/ControllerServiceMetadata.h" +#include "minifi-cpp/core/ProcessorMetadata.h" namespace org::apache::nifi::minifi::api::core { @@ -94,11 +96,12 @@ .callbacks = MinifiProcessorCallbacks{ .create = [] (MinifiProcessorMetadata metadata) -> MINIFI_OWNED void* { - return new Class{minifi::core::ProcessorMetadata{ - .uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(), - .name = std::string{metadata.name.data, metadata.name.length}, - .logger = std::make_shared<logging::Logger>(metadata.logger) - }}; + try { + return new Class{minifi::core::ProcessorMetadata{ + .uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(), + .name = std::string{metadata.name.data, metadata.name.length}, + .logger = std::make_shared<logging::Logger>(metadata.logger)}}; + } catch (...) { return nullptr; } }, .destroy = [] (MINIFI_OWNED void* self) -> void { delete static_cast<Class*>(self); @@ -127,7 +130,9 @@ } }, .onUnSchedule = [] (void* self) -> void { - static_cast<Class*>(self)->onUnSchedule(); + try { + static_cast<Class*>(self)->onUnSchedule(); + } catch (...) {} }, .calculateMetrics = [] (void* self) -> MINIFI_OWNED MinifiPublishedMetrics* { auto metrics = static_cast<Class*>(self)->calculateMetrics(); @@ -145,4 +150,43 @@ fn(description); } +template<typename Class, typename Fn> +void useControllerServiceClassDescription(Fn&& fn) { + std::vector<std::vector<MinifiStringView>> string_vector_cache; + + const auto full_name = minifi::core::className<Class>(); + + std::vector<MinifiPropertyDefinition> class_properties = utils::toProperties(Class::Properties, string_vector_cache); + + MinifiControllerServiceClassDefinition description{.full_name = utils::toStringView(full_name), + .description = utils::toStringView(Class::Description), + .class_properties_count = gsl::narrow<uint32_t>(class_properties.size()), + .class_properties_ptr = class_properties.data(), + + .callbacks = MinifiControllerServiceCallbacks{ + .create = [](MinifiControllerServiceMetadata metadata) -> MINIFI_OWNED void* { + try { + return new Class{minifi::core::ControllerServiceMetadata{ + .uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(), + .name = std::string{metadata.name.data, metadata.name.length}, + .logger = std::make_shared<logging::Logger>(metadata.logger)}}; + } catch (...) { return nullptr; } + }, + .destroy = [](MINIFI_OWNED void* self) -> void { delete static_cast<Class*>(self); }, + .enable = [](void* self, MinifiControllerServiceContext* context) -> MinifiStatus { + ControllerServiceContext context_wrapper(context); + try { + return static_cast<Class*>(self)->enable(context_wrapper); + } catch (...) { return MINIFI_STATUS_UNKNOWN_ERROR; } + }, + .notifyStop = [](void* self) -> void { + try { + static_cast<Class*>(self)->notifyStop(); + } catch (...) {} + }, + }}; + + fn(description); +} + } // namespace org::apache::nifi::minifi::api::core
diff --git a/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h b/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h index 6a14699..802ae02 100644 --- a/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h +++ b/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h
@@ -167,4 +167,28 @@ return result.value(); } +template<typename ControllerServiceType> +ControllerServiceType* parseOptionalControllerService(const core::ProcessContext& context, const minifi::core::PropertyReference& prop) { + const auto controller_service_name = context.getProperty(prop.name); + if (!controller_service_name || controller_service_name->empty()) { + return nullptr; + } + + nonstd::expected<MinifiControllerService*, std::error_code> service = context.getControllerService(*controller_service_name, minifi::core::className<ControllerServiceType>()); + if (!service) { + return nullptr; + } + + return reinterpret_cast<ControllerServiceType*>(*service); +} + +template<typename ControllerServiceType> +gsl::not_null<ControllerServiceType*> parseControllerService(const core::ProcessContext& context, const minifi::core::PropertyReference& prop) { + auto controller_service = parseOptionalControllerService<ControllerServiceType>(context, prop); + if (!controller_service) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Required controller service property '{}' is missing", prop.name)); + } + return gsl::make_not_null(controller_service); +} + } // namespace org::apache::nifi::minifi::api::utils
diff --git a/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h b/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h index 142f470..cc61aea 100644 --- a/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h +++ b/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h
@@ -1,5 +1,5 @@ /** -* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -17,22 +17,40 @@ */ #pragma once -#include "core/Processor.h" -#include "minifi-cpp/core/ProcessorMetadata.h" #include "api/core/Resource.h" +#include "core/Processor.h" +#include "core/controller/ControllerService.h" +#include "minifi-cpp/core/ControllerServiceMetadata.h" +#include "minifi-cpp/core/ProcessorMetadata.h" +#include "utils/CControllerService.h" #include "utils/CProcessor.h" +#include "minifi-cpp/agent/agent_docs.h" namespace org::apache::nifi::minifi::test::utils { -template<typename T, typename ...Args> -std::unique_ptr<minifi::core::Processor> make_custom_c_processor(minifi::core::ProcessorMetadata metadata, Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward) +template<typename T, typename... Args> +std::unique_ptr<minifi::core::Processor> make_custom_c_processor(minifi::core::ProcessorMetadata metadata, + Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward) std::unique_ptr<minifi::core::ProcessorApi> processor_impl; - minifi::api::core::useProcessorClassDescription<T>([&] (const MinifiProcessorClassDefinition& description) { - minifi::utils::useCProcessorClassDescription(description, [&] (const auto&, auto c_description) { + minifi::api::core::useProcessorClassDescription<T>([&](const MinifiProcessorClassDefinition& description) { + minifi::utils::useCProcessorClassDescription(description, [&](const auto&, auto c_description) { processor_impl = std::make_unique<minifi::utils::CProcessor>(std::move(c_description), metadata, new T(metadata, std::forward<Args>(args)...)); }); }); return std::make_unique<minifi::core::Processor>(metadata.name, metadata.uuid, std::move(processor_impl)); } +template<typename T, typename... Args> +std::shared_ptr<minifi::core::controller::ControllerService> make_custom_c_controller_service(minifi::core::ControllerServiceMetadata metadata, + Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward) + std::unique_ptr<minifi::core::controller::ControllerServiceApi> controller_service_impl; + minifi::api::core::useControllerServiceClassDescription<T>([&](const MinifiControllerServiceClassDefinition& description) { + minifi::utils::useCControllerServiceClassDescription(description, [&](const auto&, auto c_description) { + controller_service_impl = std::make_unique<minifi::utils::CControllerService>(std::move(c_description), + metadata, + new T(metadata, std::forward<Args>(args)...)); + }); + }); + return std::make_shared<minifi::core::controller::ControllerService>(metadata.name, metadata.uuid, std::move(controller_service_impl)); +} } // namespace org::apache::nifi::minifi::test::utils
diff --git a/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp b/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp new file mode 100644 index 0000000..a3b9254 --- /dev/null +++ b/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp
@@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "api/core/ControllerServiceContext.h" +#include "api/utils/minifi-c-utils.h" + +namespace org::apache::nifi::minifi::api::core { + +nonstd::expected<std::string, std::error_code> ControllerServiceContext::getProperty(const std::string_view name) const { + std::optional<std::string> value = std::nullopt; + const MinifiStatus status = MinifiControllerServiceContextGetProperty(impl_, utils::toStringView(name), + [] (void* data, const MinifiStringView result) { + (*static_cast<std::optional<std::string>*>(data)) = std::string(result.data, result.length); + }, &value); + + if (status != MINIFI_STATUS_SUCCESS) { + return nonstd::make_unexpected(utils::make_error_code(status)); + } + + if (!value) { + return nonstd::make_unexpected(utils::make_error_code(MINIFI_STATUS_UNKNOWN_ERROR)); + } + return value.value(); +} + +} // namespace org::apache::nifi::minifi::api::core
diff --git a/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp b/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp new file mode 100644 index 0000000..b17dd77 --- /dev/null +++ b/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp
@@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "api/core/ControllerServiceImpl.h" + +#include <memory> +#include <string> + +#include "fmt/format.h" +namespace org::apache::nifi::minifi::api::core { + +ControllerServiceImpl::ControllerServiceImpl(minifi::core::ControllerServiceMetadata metadata) + : metadata_(std::move(metadata)), + logger_(metadata_.logger) { + logger_->log_debug("ControllerService {} created with uuid {}", getName(), getUUIDStr()); +} + +ControllerServiceImpl::~ControllerServiceImpl() { + logger_->log_debug("Destroying controller service {} with uuid {}", getName(), getUUIDStr()); +} + +MinifiStatus ControllerServiceImpl::enable(ControllerServiceContext& ctx) { + try { + return enableImpl(ctx); + } catch (const std::exception& e) { + logger_->log_error("{}", e.what()); + throw; + } +} + +void ControllerServiceImpl::notifyStop() { + try { + notifyStopImpl(); + } catch (const std::exception& e) { + logger_->log_error("{}", e.what()); + throw; + } +} + +std::string ControllerServiceImpl::getName() const { + return metadata_.name; +} + +utils::Identifier ControllerServiceImpl::getUUID() const { + return metadata_.uuid; +} + +utils::SmallString<36> ControllerServiceImpl::getUUIDStr() const { + return getUUID().to_string(); +} + +} // namespace org::apache::nifi::minifi::api::core
diff --git a/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp b/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp index 58016d8..ec5951d 100644 --- a/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp +++ b/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp
@@ -38,4 +38,17 @@ return MinifiProcessContextHasNonEmptyProperty(impl_, utils::toStringView(name)); } +nonstd::expected<MinifiControllerService*, std::error_code> ProcessContext::getControllerService(const std::string_view controller_service_name, + const std::string_view controller_service_class) const { + void* controller_service = nullptr; + if (const MinifiStatus status = MinifiProcessContextGetControllerService(impl_, + utils::toStringView(controller_service_name), + utils::toStringView(controller_service_class), + &controller_service); + status != MINIFI_STATUS_SUCCESS) { + return nonstd::make_unexpected(utils::make_error_code(status)); + } + return static_cast<MinifiControllerService*>(controller_service); +} + } // namespace org::apache::nifi::minifi::api::core
diff --git a/libminifi/include/utils/CControllerService.h b/libminifi/include/utils/CControllerService.h index 60b8639..049f823 100644 --- a/libminifi/include/utils/CControllerService.h +++ b/libminifi/include/utils/CControllerService.h
@@ -26,8 +26,12 @@ #include "minifi-cpp/core/Property.h" #include "minifi-cpp/core/controller/ControllerServiceApi.h" +namespace org::apache::nifi::minifi { +struct ClassDescription; +} + namespace org::apache::nifi::minifi::utils { -class CControllerService; + struct CControllerServiceClassDescription { @@ -104,7 +108,6 @@ }; void useCControllerServiceClassDescription(const MinifiControllerServiceClassDefinition& class_description, - const BundleIdentifier& bundle_id, - const std::function<void(ClassDescription, CControllerServiceClassDescription)>& fn); + const std::function<void(const ClassDescription&, CControllerServiceClassDescription)>& fn); } // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/minifi-c.cpp b/libminifi/src/minifi-c.cpp index 98735db..9dd9710 100644 --- a/libminifi/src/minifi-c.cpp +++ b/libminifi/src/minifi-c.cpp
@@ -258,7 +258,7 @@ } void useCControllerServiceClassDescription(const MinifiControllerServiceClassDefinition& class_description, - const std::function<void(ClassDescription, CControllerServiceClassDescription)>& fn) { + const std::function<void(const ClassDescription&, CControllerServiceClassDescription)>& fn) { std::vector<minifi::core::Property> properties; properties.reserve(class_description.class_properties_count); for (size_t i = 0; i < class_description.class_properties_count; ++i) {
diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index 320b542..faa1456 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp
@@ -24,30 +24,30 @@ #include <sstream> #include <utility> -#include "core/Processor.h" +#include "Connection.h" +#include "LogUtils.h" +#include "ProvenanceTestHelper.h" +#include "ResourceClaim.h" +#include "TestUtils.h" #include "core/ProcessContextImpl.h" -#include "minifi-cpp/core/PropertyDefinition.h" +#include "core/ProcessSessionFactory.h" +#include "core/Processor.h" +#include "core/controller/StandardControllerServiceNode.h" +#include "core/controller/StandardControllerServiceProvider.h" #include "core/logging/LoggerConfiguration.h" #include "core/state/nodes/FlowInformation.h" -#include "core/controller/StandardControllerServiceProvider.h" -#include "ProvenanceTestHelper.h" +#include "fmt/format.h" +#include "io/StreamPipe.h" +#include "minifi-cpp/core/ProcessContext.h" +#include "minifi-cpp/core/PropertyDefinition.h" +#include "spdlog/sinks/dist_sink.h" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/sinks/stdout_sinks.h" #include "utils/ClassUtils.h" -#include "TestUtils.h" +#include "utils/GeneralUtils.h" #include "utils/Id.h" #include "utils/StringUtils.h" #include "utils/span.h" -#include "LogUtils.h" -#include "utils/GeneralUtils.h" -#include "Connection.h" -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSessionFactory.h" -#include "ResourceClaim.h" -#include "io/StreamPipe.h" - -#include "fmt/format.h" -#include "spdlog/sinks/stdout_sinks.h" -#include "spdlog/sinks/ostream_sink.h" -#include "spdlog/sinks/dist_sink.h" std::shared_ptr<LogTestController> LogTestController::getInstance(const std::shared_ptr<logging::LoggerProperties>& logger_properties) { static std::map<std::shared_ptr<logging::LoggerProperties>, std::shared_ptr<LogTestController>> map; @@ -359,6 +359,27 @@ return retVal; } +std::shared_ptr<minifi::core::controller::ControllerServiceNode> TestPlan::addController(const std::string& name, const std::shared_ptr<core::controller::ControllerService>& cs) { + if (finalized) { + return nullptr; + } + std::lock_guard<std::recursive_mutex> guard(mutex); + + std::shared_ptr<minifi::core::controller::ControllerServiceNode> controller_service_node = std::make_shared<core::controller::StandardControllerServiceNode>(cs, name, configuration_); + + controller_service_nodes_.push_back(controller_service_node); + + minifi::utils::Identifier uuid = minifi::utils::IdGenerator::getIdGenerator()->generate(); + + controller_service_node->initialize(); + controller_service_node->setUUID(uuid); + + root_process_group_->addControllerService(name, controller_service_node, uuid.to_string()); + controller_services_provider_->putControllerServiceNode(name, controller_service_node, root_process_group_.get(), uuid.to_string()); + + return controller_service_node; +} + std::shared_ptr<minifi::core::controller::ControllerServiceNode> TestPlan::addController(const std::string &controller_name, const std::string &name) { if (finalized) { return nullptr; @@ -485,7 +506,7 @@ } void TestPlan::scheduleProcessor(minifi::core::Processor* processor, const std::shared_ptr<minifi::core::ProcessContext>& context) { - if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) { + if (std::ranges::find(configured_processors_, processor) == configured_processors_.end()) { // Ordering on factories and list of configured processors do not matter const auto factory = std::make_shared<minifi::core::ProcessSessionFactoryImpl>(context); factories_.push_back(factory); @@ -506,7 +527,7 @@ } } -bool TestPlan::runProcessor(minifi::core::Processor* processor, const PreTriggerVerifier& verify) { +bool TestPlan::runProcessor(const minifi::core::Processor* processor, const PreTriggerVerifier& verify) { const auto processor_location = gsl::narrow<size_t>(std::distance(processor_queue_.begin(), getProcessorItByUuid(processor->getUUIDStr()))); return runProcessor(processor_location, verify); } @@ -576,7 +597,7 @@ const auto isFlowFileProduced = [&] { runCurrentProcessor(); const std::vector<minifi::Connection*> connections = getProcessorOutboundConnections(processor_queue_.at(location)); - return std::any_of(connections.cbegin(), connections.cend(), [] (const minifi::Connection* conn) { return !conn->isEmpty(); }); + return std::ranges::any_of(connections, [] (const minifi::Connection* conn) { return !conn->isEmpty(); }); }; return verifyEventHappenedInPollTime(wait_duration, isFlowFileProduced); } @@ -600,8 +621,7 @@ std::vector<minifi::Connection*> connections = getProcessorOutboundConnections(processor); for (auto connection : connections) { std::set<std::shared_ptr<minifi::core::FlowFile>> expiredFlowRecords; - std::shared_ptr<minifi::core::FlowFile> flowfile = connection->poll(expiredFlowRecords); - if (flowfile) { + if (std::shared_ptr<minifi::core::FlowFile> flowfile = connection->poll(expiredFlowRecords)) { return flowfile; } if (expiredFlowRecords.empty()) {
diff --git a/libminifi/test/libtest/unit/TestBase.h b/libminifi/test/libtest/unit/TestBase.h index e8f4883..e308f41 100644 --- a/libminifi/test/libtest/unit/TestBase.h +++ b/libminifi/test/libtest/unit/TestBase.h
@@ -275,6 +275,7 @@ minifi::Connection* addConnection(minifi::core::Processor* source_proc, const minifi::core::Relationship& source_relationship, minifi::core::Processor* destination_proc); + std::shared_ptr<minifi::core::controller::ControllerServiceNode> addController(const std::string& name, const std::shared_ptr<minifi::core::controller::ControllerService>& cs); std::shared_ptr<minifi::core::controller::ControllerServiceNode> addController(const std::string &controller_name, const std::string &name); bool setProperty(minifi::core::Processor* processor, const core::PropertyReference& property, std::string_view value); @@ -298,7 +299,7 @@ void scheduleProcessor(minifi::core::Processor* processor); void scheduleProcessors(); - bool runProcessor(minifi::core::Processor* processor, const PreTriggerVerifier& verify = nullptr); + bool runProcessor(const minifi::core::Processor* processor, const PreTriggerVerifier& verify = nullptr); bool runProcessor(size_t target_location, const PreTriggerVerifier& verify = nullptr); bool runNextProcessor(const PreTriggerVerifier& verify = nullptr); bool runCurrentProcessor();
diff --git a/minifi-api/include/minifi-c/minifi-c.h b/minifi-api/include/minifi-c/minifi-c.h index b689ce7..2c31b58 100644 --- a/minifi-api/include/minifi-c/minifi-c.h +++ b/minifi-api/include/minifi-c/minifi-c.h
@@ -100,6 +100,7 @@ typedef struct MinifiPublishedMetrics MinifiPublishedMetrics; typedef struct MinifiAgent MinifiAgent; +typedef struct MinifiControllerService MinifiControllerService; typedef struct MinifiControllerServiceContext MinifiControllerServiceContext; typedef enum MinifiStatus : uint32_t {