blob: eacbf0001b4cb6c6a4e870e66a6119d9b251a661 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SplitJson.h"
#include <unordered_map>
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
#include "core/Resource.h"
#include "utils/ProcessorConfigUtils.h"
#include "utils/Id.h"
#include "jsoncons_ext/jsonpath/jsonpath.hpp"
namespace org::apache::nifi::minifi::processors {
void SplitJson::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void SplitJson::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
json_path_expression_ = utils::parseProperty(context, SplitJson::JsonPathExpression);
null_value_representation_ = utils::parseEnumProperty<split_json::NullValueRepresentationOption>(context, SplitJson::NullValueRepresentation);
}
std::optional<jsoncons::json> SplitJson::queryArrayUsingJsonPath(core::ProcessSession& session, const std::shared_ptr<core::FlowFile>& flow_file) const {
const auto json_string = to_string(session.readBuffer(flow_file));
if (json_string.empty()) {
logger_->log_error("FlowFile content is empty, transferring to the 'failure' relationship");
return std::nullopt;
}
jsoncons::json json_object;
try {
json_object = jsoncons::json::parse(json_string);
} catch (const jsoncons::json_exception& e) {
logger_->log_error("FlowFile content is not a valid JSON document, transferring to the 'failure' relationship: {}", e.what());
return std::nullopt;
}
jsoncons::json query_result;
try {
query_result = jsoncons::jsonpath::json_query(json_object, json_path_expression_);
} catch (const jsoncons::jsonpath::jsonpath_error& e) {
logger_->log_error("Invalid JSON path expression '{}' set in the 'JsonPath Expression' property: {}", json_path_expression_, e.what());
return std::nullopt;
}
return query_result;
}
std::string SplitJson::jsonValueToString(const jsoncons::json& json_value) const {
if (json_value.is_null()) {
return null_value_representation_ == split_json::NullValueRepresentationOption::EmptyString ? "" : "null";
}
if (json_value.is_string()) {
return json_value.as<std::string>();
}
return json_value.to_string();
}
void SplitJson::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
auto flow_file = session.get();
if (!flow_file) {
context.yield();
return;
}
auto query_result = queryArrayUsingJsonPath(session, flow_file);
if (!query_result) {
session.transfer(flow_file, Failure);
return;
}
gsl_Assert(query_result->is_array());
if (query_result->empty()) {
logger_->log_error("JSON Path expression '{}' did not match the input flow file content, transferring to the 'failure' relationship", json_path_expression_);
session.transfer(flow_file, Failure);
return;
}
if (query_result->size() == 1 && !query_result.value()[0].is_array()) {
logger_->log_error("JSON Path expression '{}' did not return an array, transferring to the 'failure' relationship", json_path_expression_);
session.transfer(flow_file, Failure);
return;
}
const auto fragment_id = utils::IdGenerator::getIdGenerator()->generate().to_string();
const jsoncons::json& result_array = query_result->size() == 1 ? query_result.value()[0] : *query_result;
const auto original_filename = flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME);
for (size_t i = 0; i < result_array.size(); ++i) {
auto child_flow_file = session.create(flow_file.get());
child_flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, child_flow_file->getUUIDStr());
child_flow_file->setAttribute(SplitJson::FragmentIndex.name, std::to_string(i));
child_flow_file->setAttribute(SplitJson::FragmentCount.name, std::to_string(result_array.size()));
child_flow_file->setAttribute(SplitJson::FragmentIdentifier.name, fragment_id);
child_flow_file->setAttribute(SplitJson::SegmentOriginalFilename.name, original_filename ? original_filename.value() : "");
auto& json_value_to_write = result_array[i];
session.write(child_flow_file, [this, &json_value_to_write](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
auto result_string = jsonValueToString(json_value_to_write);
return gsl::narrow<int64_t>(output_stream->write(reinterpret_cast<const uint8_t*>(result_string.data()), result_string.size()));
});
session.transfer(child_flow_file, Split);
}
flow_file->setAttribute(SplitJson::FragmentIdentifier.name, fragment_id);
flow_file->setAttribute(SplitJson::FragmentCount.name, std::to_string(result_array.size()));
session.transfer(flow_file, Original);
}
REGISTER_RESOURCE(SplitJson, Processor);
} // namespace org::apache::nifi::minifi::processors