blob: 11533abf370425ee6204f7689ec12721b966dec2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "EvaluateJsonPath.h"
#include <unordered_map>
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
#include "core/Resource.h"
#include "utils/ProcessorConfigUtils.h"
#include "jsoncons_ext/jsonpath/jsonpath.hpp"
namespace org::apache::nifi::minifi::processors {
namespace {
bool isScalar(const jsoncons::json& value) {
return !value.is_array() && !value.is_object();
}
bool isQueryResultEmptyOrScalar(const jsoncons::json& query_result) {
return query_result.empty() || (query_result.size() == 1 && isScalar(query_result[0]));
}
} // namespace
void EvaluateJsonPath::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void EvaluateJsonPath::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
const auto dynamic_properties = context.getDynamicPropertyKeys();
if (dynamic_properties.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "At least one dynamic property must be specified with a valid JSON path expression");
}
destination_ = utils::parseEnumProperty<evaluate_json_path::DestinationType>(context, EvaluateJsonPath::Destination);
if (destination_ == evaluate_json_path::DestinationType::FlowFileContent && dynamic_properties.size() > 1) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one dynamic property is allowed for JSON path when destination is set to flowfile-content");
}
null_value_representation_ = utils::parseEnumProperty<evaluate_json_path::NullValueRepresentationOption>(context, EvaluateJsonPath::NullValueRepresentation);
path_not_found_behavior_ = utils::parseEnumProperty<evaluate_json_path::PathNotFoundBehaviorOption>(context, EvaluateJsonPath::PathNotFoundBehavior);
return_type_ = utils::parseEnumProperty<evaluate_json_path::ReturnTypeOption>(context, EvaluateJsonPath::ReturnType);
if (return_type_ == evaluate_json_path::ReturnTypeOption::AutoDetect) {
if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) {
return_type_ = evaluate_json_path::ReturnTypeOption::JSON;
} else {
return_type_ = evaluate_json_path::ReturnTypeOption::Scalar;
}
}
}
std::string EvaluateJsonPath::extractQueryResult(const jsoncons::json& query_result) const {
gsl_Expects(!query_result.empty());
if (query_result.size() > 1) {
gsl_Assert(return_type_ == evaluate_json_path::ReturnTypeOption::JSON);
return query_result.to_string();
}
if (query_result[0].is_null()) {
return null_value_representation_ == evaluate_json_path::NullValueRepresentationOption::EmptyString ? "" : "null";
}
if (query_result[0].is_string()) {
return query_result[0].as<std::string>();
}
return query_result[0].to_string();
}
void EvaluateJsonPath::writeQueryResult(core::ProcessSession& session, core::FlowFile& flow_file, const jsoncons::json& query_result, const std::string& property_name,
std::unordered_map<std::string, std::string>& attributes_to_set) const {
if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) {
session.write(flow_file, [&query_result, this](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
auto result_string = extractQueryResult(query_result);
return gsl::narrow<int64_t>(output_stream->write(reinterpret_cast<const uint8_t*>(result_string.data()), result_string.size()));
});
} else {
attributes_to_set.emplace(property_name, extractQueryResult(query_result));
}
}
void EvaluateJsonPath::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
auto flow_file = session.get();
if (!flow_file) {
context.yield();
logger_->log_debug("No FlowFile available, yielding");
return;
}
const auto json_string = to_string(session.readBuffer(flow_file));
if (json_string.empty()) {
logger_->log_error("FlowFile content is empty, transferring to Failure relationship");
session.transfer(flow_file, Failure);
return;
}
jsoncons::json json_object;
try {
json_object = jsoncons::json::parse(json_string);
} catch (const jsoncons::json_exception& e) {
logger_->log_error("FlowFile content is not a valid JSON document, transferring to Failure relationship: {}", e.what());
session.transfer(flow_file, Failure);
return;
}
std::unordered_map<std::string, std::string> attributes_to_set;
for (const auto& property_name : context.getDynamicPropertyKeys()) {
const auto result = context.getRawDynamicProperty(property_name);
if (!result) {
logger_->log_error("Failed to retrieve dynamic property '{}' for FlowFile with UUID '{}', transferring to Failure relationship", property_name, flow_file->getUUIDStr());
session.transfer(flow_file, Failure);
return;
}
const auto& json_path = *result;
jsoncons::json query_result;
try {
query_result = jsoncons::jsonpath::json_query(json_object, json_path);
} catch (const jsoncons::jsonpath::jsonpath_error& e) {
logger_->log_error("Invalid JSON path expression '{}' found for attribute key '{}': {}", json_path, property_name, e.what());
session.transfer(flow_file, Failure);
return;
}
if (!query_result.is_array() || query_result.empty()) {
if (path_not_found_behavior_ == evaluate_json_path::PathNotFoundBehaviorOption::Warn) {
logger_->log_warn("JSON path '{}' not found for attribute key '{}'", json_path, property_name);
}
if (destination_ == evaluate_json_path::DestinationType::FlowFileContent) {
logger_->log_debug("JSON path '{}' not found for attribute key '{}', transferring to Unmatched relationship", json_path, property_name);
session.transfer(flow_file, Unmatched);
return;
}
if (path_not_found_behavior_ != evaluate_json_path::PathNotFoundBehaviorOption::Skip) {
flow_file->setAttribute(property_name, "");
}
continue;
}
if (return_type_ == evaluate_json_path::ReturnTypeOption::Scalar && !isQueryResultEmptyOrScalar(query_result)) {
logger_->log_error("JSON path '{}' returned a non-scalar value or multiple values for attribute key '{}', transferring to Failure relationship", json_path, property_name);
session.transfer(flow_file, Failure);
return;
}
writeQueryResult(session, *flow_file, query_result, property_name, attributes_to_set);
}
if (destination_ == evaluate_json_path::DestinationType::FlowFileAttribute) {
for (const auto& [key, value] : attributes_to_set) {
session.putAttribute(*flow_file, key, value);
}
}
session.transfer(flow_file, Matched);
}
REGISTER_RESOURCE(EvaluateJsonPath, Processor);
} // namespace org::apache::nifi::minifi::processors