blob: db91ee6a7fe70648844e79c4b2b167e241837a6d [file] [log] [blame]
/**
* @file AttributesToJSON.cpp
* AttributesToJSON class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AttributesToJSON.h"
#include <unordered_set>
#include "rapidjson/writer.h"
#include "utils/StringUtils.h"
#include "utils/ProcessorConfigUtils.h"
#include "range/v3/algorithm/find.hpp"
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
#include "core/Resource.h"
namespace org::apache::nifi::minifi::processors {
void AttributesToJSON::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void AttributesToJSON::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
attribute_list_ = context.getProperty(AttributesList)
| utils::transform([](const auto attributes_list_str) { return utils::string::splitAndTrimRemovingEmpty(attributes_list_str, ","); })
| utils::valueOrElse([] { return std::vector<std::string>{}; });
attributes_regular_expression_ = context.getProperty(AttributesRegularExpression)
| utils::transform([](const auto s) { return utils::Regex{s}; })
| utils::toOptional();
write_destination_ = utils::parseEnumProperty<attributes_to_json::WriteDestination>(context, Destination);
include_core_attributes_ = context.getProperty(IncludeCoreAttributes)
| utils::andThen(parsing::parseBool)
| utils::orThrow("AttributesToJSON::IncludeCoreAttributes should be available in onSchedule");
null_value_ = context.getProperty(NullValue)
| utils::andThen(parsing::parseBool)
| utils::orThrow("AttributesToJSON::NullValue should be available in onSchedule");
}
bool AttributesToJSON::isCoreAttributeToBeFiltered(const std::string& attribute) const {
const auto& special_attributes = core::SpecialFlowAttribute::getSpecialFlowAttributes();
return !include_core_attributes_ && ranges::find(special_attributes, attribute) != ranges::end(special_attributes);
}
std::optional<std::unordered_set<std::string>> AttributesToJSON::getAttributesToBeWritten(const core::FlowFile::AttributeMap& flowfile_attributes) const {
if (attribute_list_.empty() && !attributes_regular_expression_) {
return std::nullopt;
}
std::unordered_set<std::string> attributes;
for (const auto& attribute : attribute_list_) {
attributes.insert(attribute);
}
if (attributes_regular_expression_) {
for (const auto& [key, value] : flowfile_attributes) {
if (utils::regexMatch(key, attributes_regular_expression_.value())) {
attributes.insert(key);
}
}
}
return attributes;
}
void AttributesToJSON::addAttributeToJson(rapidjson::Document& document, const std::string& key, const std::optional<std::string>& value) const {
rapidjson::Value json_key(key.c_str(), document.GetAllocator());
rapidjson::Value json_val;
if (value || !null_value_) {
json_val.SetString(value ? value->c_str() : "", document.GetAllocator());
}
document.AddMember(json_key, json_val, document.GetAllocator());
}
std::string AttributesToJSON::buildAttributeJsonData(const core::FlowFile::AttributeMap& flowfile_attributes) {
auto root = rapidjson::Document(rapidjson::kObjectType);
if (auto attributes_to_write = getAttributesToBeWritten(flowfile_attributes); attributes_to_write) {
for (const auto& key : *attributes_to_write) {
auto it = flowfile_attributes.find(key);
addAttributeToJson(root, key, it == flowfile_attributes.end() ? std::nullopt : std::make_optional(it->second));
}
} else {
for (const auto& [key, value] : flowfile_attributes) {
if (!isCoreAttributeToBeFiltered(key)) {
addAttributeToJson(root, key, value);
}
}
}
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
root.Accept(writer);
return buffer.GetString();
}
void AttributesToJSON::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
auto flow_file = session.get();
if (!flow_file) {
return;
}
auto json_data = buildAttributeJsonData(*flow_file->getAttributesPtr());
if (write_destination_ == attributes_to_json::WriteDestination::FLOWFILE_ATTRIBUTE) {
logger_->log_debug("Writing the following attribute data to JSONAttributes attribute: {}", json_data);
session.putAttribute(*flow_file, "JSONAttributes", json_data);
session.transfer(flow_file, Success);
} else {
logger_->log_debug("Writing the following attribute data to flowfile: {}", json_data);
session.writeBuffer(flow_file, json_data);
session.transfer(flow_file, Success);
}
}
REGISTER_RESOURCE(AttributesToJSON, Processor);
} // namespace org::apache::nifi::minifi::processors