blob: eff5f49aa049a309ddef250293e325b65b72a207 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/yaml/YamlFlowSerializer.h"
#include <unordered_set>
#include "utils/crypto/property_encryption/PropertyEncryptionUtils.h"
namespace org::apache::nifi::minifi::core::yaml {
void YamlFlowSerializer::addProviderCreatedParameterContexts(YAML::Node flow_definition_yaml, const core::flow::FlowSchema& schema,
const std::unordered_map<std::string, gsl::not_null<std::unique_ptr<ParameterContext>>>& parameter_contexts) {
std::vector<gsl::not_null<ParameterContext*>> provided_parameter_contexts;
for (const auto& [parameter_context_name, parameter_context] : parameter_contexts) {
if (!parameter_context->getParameterProvider().empty()) {
provided_parameter_contexts.push_back(gsl::make_not_null(parameter_context.get()));
}
}
if (provided_parameter_contexts.empty()) {
return;
}
auto parameter_contexts_node = flow_definition_yaml[schema.parameter_contexts[0]];
if (!parameter_contexts_node.IsDefined() || !parameter_contexts_node.IsSequence()) {
parameter_contexts_node = YAML::Node(YAML::NodeType::Sequence);
}
for (const auto& parameter_context : provided_parameter_contexts) {
YAML::Node parameter_context_node;
parameter_context_node[schema.identifier[0]] = std::string(parameter_context->getUUIDStr());
parameter_context_node[schema.name[0]] = parameter_context->getName();
parameter_context_node[schema.parameter_provider[0]] = parameter_context->getParameterProvider();
YAML::Node parameters_node = YAML::Node(YAML::NodeType::Sequence);
auto parameters = parameter_context->getParameters();
for (const auto& [name, parameter] : parameters) {
YAML::Node parameter_node;
parameter_node[schema.name[0]] = name;
parameter_node[schema.description[0]] = parameter.description;
parameter_node[schema.sensitive[0]] = parameter.sensitive;
parameter_node[schema.provided[0]] = parameter.provided;
parameter_node[schema.value[0]] = parameter.value;
parameters_node.push_back(parameter_node);
}
parameter_context_node[schema.parameters[0]] = parameters_node;
int64_t index = -1;
for (int64_t i = 0; i < gsl::narrow<int64_t>(parameter_contexts_node.size()); ++i) {
if (parameter_contexts_node[i][schema.name[0]].as<std::string>() == parameter_context->getName()) {
index = i;
break;
}
}
if (index != -1) {
parameter_contexts_node[index] = parameter_context_node;
} else {
parameter_contexts_node.push_back(parameter_context_node);
}
}
}
void YamlFlowSerializer::encryptSensitiveProperties(YAML::Node property_yamls,
const std::map<std::string, Property, std::less<>>& properties,
const utils::crypto::EncryptionProvider& encryption_provider,
const core::flow::Overrides& overrides) const {
std::unordered_set<std::string> processed_property_names;
for (auto kv : property_yamls) {
auto name = kv.first.as<std::string>();
if (!properties.contains(name)) {
logger_->log_warn("Property {} found in flow definition does not exist!", name);
continue;
}
if (properties.at(name).isSensitive()) {
if (kv.second.IsSequence()) {
for (auto property_item : kv.second) {
const auto override_value = overrides.get(name);
auto value = override_value ? *override_value : property_item["value"].as<std::string>();
property_item["value"] = utils::crypto::property_encryption::encrypt(value, encryption_provider);
}
} else {
const auto override_value = overrides.get(name);
auto value = override_value ? *override_value : kv.second.as<std::string>();
property_yamls[name] = utils::crypto::property_encryption::encrypt(value, encryption_provider);
}
processed_property_names.insert(name);
}
}
for (const auto& [name, value] : overrides.getRequired()) {
gsl_Expects(properties.contains(name) && properties.at(name).isSensitive());
if (processed_property_names.contains(name)) { continue; }
property_yamls[name] = utils::crypto::property_encryption::encrypt(value, encryption_provider);
}
}
void YamlFlowSerializer::encryptSensitiveParameters(YAML::Node& flow_definition_yaml, const core::flow::FlowSchema& schema, const utils::crypto::EncryptionProvider& encryption_provider,
const std::unordered_map<utils::Identifier, core::flow::Overrides>& overrides) const {
for (auto parameter_context : flow_definition_yaml[schema.parameter_contexts[0]]) {
for (auto parameter : parameter_context[schema.parameters[0]]) {
bool is_sensitive = false;
std::istringstream is(parameter[schema.sensitive[0]].Scalar());
is >> std::boolalpha >> is_sensitive;
if (!is_sensitive) {
continue;
}
const auto parameter_context_id_str = parameter_context[schema.identifier[0]].Scalar();
const auto parameter_context_id = utils::Identifier::parse(parameter_context_id_str);
if (!parameter_context_id) {
logger_->log_warn("Invalid parameter context ID found in the flow definition: {}", parameter_context_id_str);
continue;
}
auto parameter_value = parameter[schema.value[0]].Scalar();
if (overrides.contains(*parameter_context_id)) {
const auto& override_values = overrides.at(*parameter_context_id);
const auto parameter_name = parameter[schema.name[0]].Scalar();
if (auto parameter_override_value = override_values.get(parameter_name)) {
parameter_value = *parameter_override_value;
}
}
parameter[schema.value[0]] = utils::crypto::property_encryption::encrypt(parameter_value, encryption_provider);
}
}
}
void YamlFlowSerializer::encryptSensitiveProcessorProperties(YAML::Node& flow_definition_yaml, const core::ProcessGroup& process_group, const core::flow::FlowSchema& schema,
const utils::crypto::EncryptionProvider& encryption_provider, const std::unordered_map<utils::Identifier, core::flow::Overrides>& overrides) const {
for (auto processor_yaml : flow_definition_yaml[schema.processors[0]]) {
const auto processor_id = utils::Identifier::parse(processor_yaml[schema.identifier[0]].Scalar());
if (!processor_id) {
logger_->log_warn("Invalid processor ID found in the flow definition: {}", processor_yaml[schema.identifier[0]].Scalar());
continue;
}
const auto* processor = process_group.findProcessorById(*processor_id);
if (!processor) {
logger_->log_warn("Processor {} not found in the flow definition", processor_id->to_string());
continue;
}
const auto processor_overrides = overrides.contains(*processor_id) ? overrides.at(*processor_id) : core::flow::Overrides{};
encryptSensitiveProperties(processor_yaml[schema.processor_properties[0]], processor->getSupportedProperties(), encryption_provider, processor_overrides);
}
}
void YamlFlowSerializer::encryptSensitiveControllerServiceProperties(YAML::Node& flow_definition_yaml, const core::ProcessGroup& process_group, const core::flow::FlowSchema& schema,
const utils::crypto::EncryptionProvider& encryption_provider, const std::unordered_map<utils::Identifier, core::flow::Overrides>& overrides) const {
for (auto controller_service_yaml : flow_definition_yaml[schema.controller_services[0]]) {
const auto controller_service_id_str = controller_service_yaml[schema.identifier[0]].Scalar();
const auto controller_service_id = utils::Identifier::parse(controller_service_id_str);
if (!controller_service_id) {
logger_->log_warn("Invalid controller service ID found in the flow definition: {}", controller_service_id_str);
continue;
}
const auto* const controller_service_node = process_group.findControllerService(controller_service_id_str);
if (!controller_service_node) {
logger_->log_warn("Controller service node {} not found in the flow definition", controller_service_id_str);
continue;
}
const auto controller_service = controller_service_node->getControllerServiceImplementation();
if (!controller_service) {
logger_->log_warn("Controller service {} not found in the flow definition", controller_service_id_str);
continue;
}
const auto controller_service_overrides = overrides.contains(*controller_service_id) ? overrides.at(*controller_service_id) : core::flow::Overrides{};
encryptSensitiveProperties(controller_service_yaml[schema.controller_service_properties[0]], controller_service->getSupportedProperties(), encryption_provider, controller_service_overrides);
}
}
std::string YamlFlowSerializer::serialize(const core::ProcessGroup& process_group, const core::flow::FlowSchema& schema, const utils::crypto::EncryptionProvider& encryption_provider,
const std::unordered_map<utils::Identifier, core::flow::Overrides>& overrides,
const std::unordered_map<std::string, gsl::not_null<std::unique_ptr<ParameterContext>>>& parameter_contexts) const {
gsl_Expects(schema.identifier.size() == 1 &&
schema.processors.size() == 1 && schema.processor_properties.size() == 1 &&
schema.controller_services.size() == 1 && schema.controller_service_properties.size() == 1);
auto flow_definition_yaml = YAML::Clone(flow_definition_yaml_);
addProviderCreatedParameterContexts(flow_definition_yaml, schema, parameter_contexts);
encryptSensitiveParameters(flow_definition_yaml, schema, encryption_provider, overrides);
encryptSensitiveProcessorProperties(flow_definition_yaml, process_group, schema, encryption_provider, overrides);
encryptSensitiveControllerServiceProperties(flow_definition_yaml, process_group, schema, encryption_provider, overrides);
return YAML::Dump(flow_definition_yaml) + '\n';
}
} // namespace org::apache::nifi::minifi::core::yaml