blob: 8159983565ce7b856ff46b44a0a090d2f3c93400 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "minifi-c/minifi-c.h"
#include <memory>
#include <vector>
#include "agent/agent_docs.h"
#include "core/ProcessorMetrics.h"
#include "minifi-cpp/core/Annotation.h"
#include "minifi-cpp/core/ClassLoader.h"
#include "minifi-cpp/core/ProcessContext.h"
#include "minifi-cpp/core/ProcessSession.h"
#include "minifi-cpp/core/ProcessorApi.h"
#include "minifi-cpp/core/ProcessorDescriptor.h"
#include "minifi-cpp/core/ProcessorFactory.h"
#include "minifi-cpp/core/ProcessorMetadata.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "minifi-cpp/core/logging/Logger.h"
#include "minifi-cpp/core/state/PublishedMetricProvider.h"
#include "minifi-cpp/Exception.h"
#include "core/extension/ExtensionManager.h"
#include "utils/PropertyErrors.h"
#include "minifi-cpp/agent/build_description.h"
#include "utils/CProcessor.h"
namespace minifi = org::apache::nifi::minifi;
namespace {
std::string toString(MinifiStringView sv) {
return {sv.data, sv.length};
}
std::string_view toStringView(MinifiStringView sv) {
return {sv.data, sv.length};
}
minifi::core::annotation::Input toInputRequirement(MinifiInputRequirement req) {
switch (req) {
case MINIFI_INPUT_REQUIRED: return minifi::core::annotation::Input::INPUT_REQUIRED;
case MINIFI_INPUT_ALLOWED: return minifi::core::annotation::Input::INPUT_ALLOWED;
case MINIFI_INPUT_FORBIDDEN: return minifi::core::annotation::Input::INPUT_FORBIDDEN;
}
gsl_FailFast();
}
minifi::core::logging::LOG_LEVEL toLogLevel(MinifiLogLevel lvl) {
switch (lvl) {
case MINIFI_LOG_LEVEL_TRACE: return minifi::core::logging::trace;
case MINIFI_LOG_LEVEL_DEBUG: return minifi::core::logging::debug;
case MINIFI_LOG_LEVEL_INFO: return minifi::core::logging::info;
case MINIFI_LOG_LEVEL_WARNING: return minifi::core::logging::warn;
case MINIFI_LOG_LEVEL_ERROR: return minifi::core::logging::err;
case MINIFI_LOG_LEVEL_CRITICAL: return minifi::core::logging::critical;
case MINIFI_LOG_LEVEL_OFF: return minifi::core::logging::off;
}
gsl_FailFast();
}
gsl::not_null<const minifi::core::PropertyValidator*> toPropertyValidator(MinifiValidator validator) {
switch (validator) {
case MINIFI_VALIDATOR_ALWAYS_VALID: return gsl::make_not_null(&minifi::core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR);
case MINIFI_VALIDATOR_NON_BLANK: return gsl::make_not_null(&minifi::core::StandardPropertyValidators::NON_BLANK_VALIDATOR);
case MINIFI_VALIDATOR_TIME_PERIOD: return gsl::make_not_null(&minifi::core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR);
case MINIFI_VALIDATOR_BOOLEAN: return gsl::make_not_null(&minifi::core::StandardPropertyValidators::BOOLEAN_VALIDATOR);
case MINIFI_VALIDATOR_INTEGER: return gsl::make_not_null(&minifi::core::StandardPropertyValidators::INTEGER_VALIDATOR);
case MINIFI_VALIDATOR_UNSIGNED_INTEGER: return gsl::make_not_null(&minifi::core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR);
case MINIFI_VALIDATOR_DATA_SIZE: return gsl::make_not_null(&minifi::core::StandardPropertyValidators::DATA_SIZE_VALIDATOR);
case MINIFI_VALIDATOR_PORT: return gsl::make_not_null(&minifi::core::StandardPropertyValidators::PORT_VALIDATOR);
}
gsl_FailFast();
}
minifi::core::Property createProperty(const MinifiPropertyDefinition* property_description) {
gsl_Expects(property_description);
std::vector<std::string_view> allowed_values;
allowed_values.reserve(property_description->allowed_values_count);
for (size_t i = 0; i < property_description->allowed_values_count; ++i) {
allowed_values.push_back(toStringView(property_description->allowed_values_ptr[i]));
}
std::vector<std::string_view> allowed_types;
if (property_description->type) {
allowed_types.push_back(toStringView(*property_description->type));
}
std::optional<std::string_view> default_value;
if (property_description->default_value) {
default_value = toStringView(*property_description->default_value);
}
return minifi::core::Property{minifi::core::PropertyReference{
toStringView(property_description->name),
toStringView(property_description->display_name),
toStringView(property_description->description),
property_description->is_required,
property_description->is_sensitive,
std::span(allowed_values),
std::span(allowed_types),
default_value,
toPropertyValidator(property_description->validator),
property_description->supports_expression_language
}};
}
class CProcessorFactory : public minifi::core::ProcessorFactory {
public:
CProcessorFactory(std::string group_name, std::string class_name, minifi::utils::CProcessorClassDescription class_description)
: group_name_(std::move(group_name)),
class_name_(std::move(class_name)),
class_description_(std::move(class_description)) {}
std::unique_ptr<minifi::core::ProcessorApi> create(minifi::core::ProcessorMetadata metadata) override {
return std::make_unique<minifi::utils::CProcessor>(class_description_, metadata);
}
[[nodiscard]] std::string getGroupName() const override {
return group_name_;
}
[[nodiscard]] std::string getClassName() const override {
return class_name_;
}
CProcessorFactory() = delete;
CProcessorFactory(const CProcessorFactory&) = delete;
CProcessorFactory& operator=(const CProcessorFactory&) = delete;
CProcessorFactory(CProcessorFactory&&) = delete;
CProcessorFactory& operator=(CProcessorFactory&&) = delete;
~CProcessorFactory() override = default;
private:
std::string group_name_;
std::string class_name_;
minifi::utils::CProcessorClassDescription class_description_;
};
} // namespace
namespace org::apache::nifi::minifi::utils {
void useCProcessorClassDescription(const MinifiProcessorClassDefinition& class_description, const std::function<void(minifi::ClassDescription, minifi::utils::CProcessorClassDescription)>& fn) {
std::vector<minifi::core::Property> properties;
properties.reserve(class_description.class_properties_count);
for (size_t i = 0; i < class_description.class_properties_count; ++i) {
properties.push_back(createProperty(&class_description.class_properties_ptr[i]));
}
std::vector<minifi::core::DynamicProperty> dynamic_properties;
dynamic_properties.reserve(class_description.dynamic_properties_count);
for (size_t i = 0; i < class_description.dynamic_properties_count; ++i) {
dynamic_properties.push_back(minifi::core::DynamicPropertyDefinition{
.name = toStringView(class_description.dynamic_properties_ptr[i].name),
.value = toStringView(class_description.dynamic_properties_ptr[i].value),
.description = toStringView(class_description.dynamic_properties_ptr[i].description),
.supports_expression_language = class_description.dynamic_properties_ptr[i].supports_expression_language
});
}
std::vector<minifi::core::Relationship> relationships;
relationships.reserve(class_description.class_relationships_count);
for (size_t i = 0; i < class_description.class_relationships_count; ++i) {
relationships.push_back(minifi::core::Relationship{
toString(class_description.class_relationships_ptr[i].name),
toString(class_description.class_relationships_ptr[i].description)
});
}
std::vector<minifi::core::OutputAttribute> output_attributes;
for (size_t attribute_idx = 0; attribute_idx < class_description.output_attributes_count; ++attribute_idx) {
minifi::core::OutputAttribute output_attribute{};
output_attribute.name = toString(class_description.output_attributes_ptr[attribute_idx].name);
output_attribute.description = toString(class_description.output_attributes_ptr[attribute_idx].description);
for (size_t rel_idx = 0; rel_idx < class_description.output_attributes_ptr[attribute_idx].relationships_count; ++rel_idx) {
auto output_attribute_rel_name = toStringView(class_description.output_attributes_ptr[attribute_idx].relationships_ptr[rel_idx]);
auto rel_it = std::find(relationships.begin(), relationships.end(), minifi::core::Relationship{std::string{output_attribute_rel_name}, ""});
gsl_Assert(rel_it != relationships.end());
output_attribute.relationships.push_back(*rel_it);
}
output_attributes.push_back(output_attribute);
}
auto name_segments = minifi::utils::string::split(toStringView(class_description.full_name), "::");
gsl_Assert(!name_segments.empty());
minifi::ClassDescription description{
.type_ = minifi::ResourceType::Processor,
.short_name_ = name_segments.back(),
.full_name_ = minifi::utils::string::join(".", name_segments),
.description_ = toString(class_description.description),
.class_properties_ = properties,
.dynamic_properties_ = dynamic_properties,
.class_relationships_ = relationships,
.output_attributes_ = output_attributes,
.supports_dynamic_properties_ = class_description.supports_dynamic_properties,
.supports_dynamic_relationships_ = class_description.supports_dynamic_relationships,
.inputRequirement_ = minifi::core::annotation::toString(toInputRequirement(class_description.input_requirement)),
.isSingleThreaded_ = class_description.is_single_threaded,
};
minifi::utils::CProcessorClassDescription c_class_description{
.name = name_segments.back(),
.class_properties = properties,
.class_relationships = relationships,
.supports_dynamic_properties = description.supports_dynamic_properties_,
.supports_dynamic_relationships = description.supports_dynamic_relationships_,
.input_requirement = toInputRequirement(class_description.input_requirement),
.is_single_threaded = description.isSingleThreaded_,
.callbacks = class_description.callbacks
};
fn(description, c_class_description);
}
} // namespace org::apache::nifi::minifi::utils
extern "C" {
MinifiExtension* MinifiCreateExtension(MinifiStringView /*api_version*/, const MinifiExtensionCreateInfo* extension_create_info) {
gsl_Assert(extension_create_info);
auto extension_name = toString(extension_create_info->name);
minifi::BundleDetails bundle{
.artifact = extension_name,
.group = "org.apache.nifi.minifi",
.version = toString(extension_create_info->version)
};
for (size_t proc_idx = 0; proc_idx < extension_create_info->processors_count; ++proc_idx) {
minifi::utils::useCProcessorClassDescription(extension_create_info->processors_ptr[proc_idx], [&] (const auto& description, const auto& c_class_description) {
minifi::ExternalBuildDescription::addExternalComponent(bundle, description);
minifi::core::ClassLoader::getDefaultClassLoader().getClassLoader(extension_name).registerClass(
c_class_description.name,
std::make_unique<CProcessorFactory>(extension_name, toString(extension_create_info->processors_ptr[proc_idx].full_name), c_class_description));
});
}
return reinterpret_cast<MinifiExtension*>(new org::apache::nifi::minifi::core::extension::Extension::Info{
.name = toString(extension_create_info->name),
.version = toString(extension_create_info->version),
.deinit = extension_create_info->deinit,
.user_data = extension_create_info->user_data
});
}
MinifiStatus MinifiProcessContextGetProperty(MinifiProcessContext* context, MinifiStringView property_name, MinifiFlowFile* flowfile,
void (*result_cb)(void* user_ctx, MinifiStringView result), void* user_ctx) {
gsl_Assert(context != MINIFI_NULL);
auto result = reinterpret_cast<minifi::core::ProcessContext*>(context)->getProperty(toStringView(property_name),
flowfile != MINIFI_NULL ? reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile)->get() : nullptr);
if (result) {
result_cb(user_ctx, MinifiStringView{.data = result.value().data(), .length = result.value().length()});
return MINIFI_STATUS_SUCCESS;
}
switch (static_cast<minifi::core::PropertyErrorCode>(result.error().value())) {
case minifi::core::PropertyErrorCode::NotSupportedProperty: return MINIFI_STATUS_NOT_SUPPORTED_PROPERTY;
case minifi::core::PropertyErrorCode::DynamicPropertiesNotSupported: return MINIFI_STATUS_DYNAMIC_PROPERTIES_NOT_SUPPORTED;
case minifi::core::PropertyErrorCode::PropertyNotSet: return MINIFI_STATUS_PROPERTY_NOT_SET;
case minifi::core::PropertyErrorCode::ValidationFailed: return MINIFI_STATUS_VALIDATION_FAILED;
default: return MINIFI_STATUS_UNKNOWN_ERROR;
}
}
void MinifiProcessContextGetProcessorName(MinifiProcessContext* context, void(*cb)(void* user_ctx, MinifiStringView result), void* user_ctx) {
gsl_Assert(context != MINIFI_NULL);
auto name = reinterpret_cast<minifi::core::ProcessContext*>(context)->getProcessorInfo().getName();
cb(user_ctx, MinifiStringView{.data = name.data(), .length = name.length()});
}
MinifiBool MinifiProcessContextHasNonEmptyProperty(MinifiProcessContext* context, MinifiStringView property_name) {
gsl_Assert(context != MINIFI_NULL);
return reinterpret_cast<minifi::core::ProcessContext*>(context)->hasNonEmptyProperty(toString(property_name));
}
void MinifiConfigGet(MinifiConfig* configure, MinifiStringView key, void(*cb)(void* user_ctx, MinifiStringView result), void* user_ctx) {
gsl_Assert(configure != MINIFI_NULL);
auto value = reinterpret_cast<minifi::Configure*>(configure)->get(toString(key));
if (value) {
cb(user_ctx, MinifiStringView{
.data = value->data(),
.length = value->length()
});
}
}
void MinifiLoggerSetMaxLogSize(MinifiLogger* logger, int32_t max_size) {
gsl_Assert(logger != MINIFI_NULL);
(*reinterpret_cast<std::shared_ptr<minifi::core::logging::Logger>*>(logger))->set_max_log_size(max_size);
}
void MinifiLoggerLogString(MinifiLogger* logger, MinifiLogLevel level, MinifiStringView msg) {
gsl_Assert(logger != MINIFI_NULL);
(*reinterpret_cast<std::shared_ptr<minifi::core::logging::Logger>*>(logger))->log_string(toLogLevel(level), toString(msg));
}
MinifiBool MinifiLoggerShouldLog(MinifiLogger* logger, MinifiLogLevel level) {
gsl_Assert(logger != MINIFI_NULL);
return (*reinterpret_cast<std::shared_ptr<minifi::core::logging::Logger>*>(logger))->should_log(toLogLevel(level));
}
MinifiLogLevel MinifiLoggerLevel(MinifiLogger* logger) {
gsl_Assert(logger != MINIFI_NULL);
switch ((*reinterpret_cast<std::shared_ptr<minifi::core::logging::Logger>*>(logger))->level()) {
case minifi::core::logging::LOG_LEVEL::trace: return MINIFI_LOG_LEVEL_TRACE;
case minifi::core::logging::LOG_LEVEL::debug: return MINIFI_LOG_LEVEL_DEBUG;
case minifi::core::logging::LOG_LEVEL::info: return MINIFI_LOG_LEVEL_INFO;
case minifi::core::logging::LOG_LEVEL::warn: return MINIFI_LOG_LEVEL_WARNING;
case minifi::core::logging::LOG_LEVEL::err: return MINIFI_LOG_LEVEL_ERROR;
case minifi::core::logging::LOG_LEVEL::critical: return MINIFI_LOG_LEVEL_CRITICAL;
case minifi::core::logging::LOG_LEVEL::off: return MINIFI_LOG_LEVEL_OFF;
}
gsl_FailFast();
}
MINIFI_OWNED gsl::owner<MinifiPublishedMetrics*> MinifiPublishedMetricsCreate(const size_t count, const MinifiStringView* metric_names, const double* metric_values) {
const gsl::owner<std::vector<minifi::state::PublishedMetric>*> metrics = new std::vector<minifi::state::PublishedMetric>(); // NOLINT(modernize-use-auto)
metrics->reserve(count);
for (size_t i = 0; i < count; i++) {
metrics->emplace_back(minifi::state::PublishedMetric{toString(metric_names[i]), metric_values[i], {}});
}
return reinterpret_cast<MinifiPublishedMetrics*>(metrics);
}
MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionGet(MinifiProcessSession* session) {
gsl_Assert(session != MINIFI_NULL);
if (const auto ff = reinterpret_cast<minifi::core::ProcessSession*>(session)->get()) {
return reinterpret_cast<MinifiFlowFile*>(new std::shared_ptr<minifi::core::FlowFile>(ff));
}
return MINIFI_NULL;
}
MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionCreate(MinifiProcessSession* session, MinifiFlowFile* parent) {
gsl_Assert(session != MINIFI_NULL);
if (const auto ff = reinterpret_cast<minifi::core::ProcessSession*>(session)->create(parent != MINIFI_NULL
? reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(parent)->get()
: nullptr)) {
return reinterpret_cast<MinifiFlowFile*>(new std::shared_ptr<minifi::core::FlowFile>(ff));
}
return MINIFI_NULL;
}
void MinifiProcessSessionTransfer(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile, MinifiStringView relationship_name) {
gsl_Assert(flowfile != MINIFI_NULL);
reinterpret_cast<minifi::core::ProcessSession*>(session)->transfer(
*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile), minifi::core::Relationship{toString(relationship_name), ""});
}
void MinifiProcessSessionRemove(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile) {
gsl_Assert(flowfile != MINIFI_NULL);
reinterpret_cast<minifi::core::ProcessSession*>(session)->remove(*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile));
}
MinifiStatus MinifiProcessSessionRead(MinifiProcessSession* session, MinifiFlowFile* flowfile, int64_t(*cb)(void* user_ctx, MinifiInputStream*), void* user_ctx) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
try {
reinterpret_cast<minifi::core::ProcessSession*>(session)->read(*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile), [&] (auto& input_stream) -> int64_t {
return cb(user_ctx, reinterpret_cast<MinifiInputStream*>(input_stream.get()));
});
return MINIFI_STATUS_SUCCESS;
} catch (...) {
return MINIFI_STATUS_UNKNOWN_ERROR;
}
}
MinifiStatus MinifiProcessSessionWrite(MinifiProcessSession* session, MinifiFlowFile* ff, int64_t(*cb)(void* user_ctx, MinifiOutputStream*), void* user_ctx) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(ff != MINIFI_NULL);
try {
reinterpret_cast<minifi::core::ProcessSession*>(session)->write(*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(ff), [&] (auto& output_stream) -> int64_t {
return cb(user_ctx, reinterpret_cast<MinifiOutputStream*>(output_stream.get()));
});
return MINIFI_STATUS_SUCCESS;
} catch (...) {
return MINIFI_STATUS_UNKNOWN_ERROR;
}
}
size_t MinifiInputStreamSize(MinifiInputStream* stream) {
gsl_Assert(stream != MINIFI_NULL);
return reinterpret_cast<minifi::io::InputStream*>(stream)->size();
}
int64_t MinifiInputStreamRead(MinifiInputStream* stream, char* buffer, size_t size) {
gsl_Assert(stream != MINIFI_NULL);
return gsl::narrow<int64_t>(reinterpret_cast<minifi::io::InputStream*>(stream)->read(std::span(reinterpret_cast<std::byte*>(buffer), size)));
}
int64_t MinifiOutputStreamWrite(MinifiOutputStream* stream, const char* data, size_t size) {
gsl_Assert(stream != MINIFI_NULL);
return gsl::narrow<int64_t>(reinterpret_cast<minifi::io::OutputStream*>(stream)->write(as_bytes(std::span(data, size))));
}
void MinifiStatusToString(MinifiStatus status, void(*cb)(void* user_ctx, MinifiStringView str), void* user_ctx) {
std::string message = [&] () -> std::string {
switch (status) {
case MINIFI_STATUS_SUCCESS: return "Success";
case MINIFI_STATUS_UNKNOWN_ERROR: return "Unknown error";
case MINIFI_STATUS_NOT_SUPPORTED_PROPERTY: return minifi::core::PropertyErrorCategory{}.message(static_cast<int>(minifi::core::PropertyErrorCode::NotSupportedProperty));
case MINIFI_STATUS_DYNAMIC_PROPERTIES_NOT_SUPPORTED: return minifi::core::PropertyErrorCategory{}.message(static_cast<int>(minifi::core::PropertyErrorCode::DynamicPropertiesNotSupported));
case MINIFI_STATUS_PROPERTY_NOT_SET: return minifi::core::PropertyErrorCategory{}.message(static_cast<int>(minifi::core::PropertyErrorCode::PropertyNotSet));
case MINIFI_STATUS_VALIDATION_FAILED: return minifi::core::PropertyErrorCategory{}.message(static_cast<int>(minifi::core::PropertyErrorCode::ValidationFailed));
default: return "Unknown error";
}
}();
cb(user_ctx, MinifiStringView{.data = message.data(), .length = message.size()});
}
void MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
if (attribute_value == nullptr) {
reinterpret_cast<minifi::core::ProcessSession*>(session)->removeAttribute(**reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile), toString(attribute_name));
} else {
reinterpret_cast<minifi::core::ProcessSession*>(session)->putAttribute(
**reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile), toString(attribute_name), toString(*attribute_value));
}
}
MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name,
void(*cb)(void* user_ctx, MinifiStringView attribute_value), void* user_ctx) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
auto value = (*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile))->getAttribute(toString(attribute_name));
if (!value.has_value()) {
return false;
}
cb(user_ctx, MinifiStringView{.data = value->data(), .length = value->size()});
return true;
}
void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile,
void(*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
for (auto& [key, value] : (*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile))->getAttributes()) {
cb(user_ctx, MinifiStringView{.data = key.data(), .length = key.size()}, MinifiStringView{.data = value.data(), .length = value.size()});
}
}
} // extern "C"