blob: ca6e1c747d1b0e939598f43b409ad76e868f581c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PostElasticsearch.h"
#include <utility>
#include <vector>
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "rapidjson/document.h"
#include "rapidjson/stream.h"
#include "rapidjson/writer.h"
#include "utils/JsonCallback.h"
#include "utils/expected.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::extensions::elasticsearch {
void PostElasticsearch::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void PostElasticsearch::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
max_batch_size_ = utils::parseU64Property(context, MaxBatchSize);
if (max_batch_size_ < 1)
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
if (auto hosts_str = context.getProperty(Hosts)) {
auto hosts = utils::string::split(*hosts_str, ",");
if (hosts.size() > 1)
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet supported");
host_url_ = hosts[0];
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
}
credentials_service_ = utils::parseOptionalControllerService<ElasticsearchCredentialsControllerService>(context, PostElasticsearch::ElasticCredentials, getUUID());
if (!credentials_service_)
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch credentials service");
auto ssl_context_service = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, PostElasticsearch::SSLContext, getUUID());
client_.initialize(http::HttpRequestMethod::POST, host_url_ + "/_bulk", ssl_context_service);
client_.setContentType("application/json");
credentials_service_->authenticateClient(client_);
}
namespace {
class ElasticPayload {
public:
[[nodiscard]] std::string toString() const {
auto result = headerString();
if (payload_) {
rapidjson::StringBuffer payload_buffer;
rapidjson::Writer<rapidjson::StringBuffer> payload_writer(payload_buffer);
payload_->Accept(payload_writer);
result = result + std::string("\n") + payload_buffer.GetString();
}
return result;
}
static nonstd::expected<ElasticPayload, std::string> parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
auto action = context.getProperty(PostElasticsearch::Action, flow_file.get());
if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
return nonstd::make_unexpected("Missing or invalid action");
auto index = context.getProperty(PostElasticsearch::Index, flow_file.get());
if (!index)
return nonstd::make_unexpected("Missing index");
auto id = context.getProperty(PostElasticsearch::Identifier, flow_file.get()) | utils::toOptional();
if (!id && (action == "delete" || action == "update" || action == "upsert"))
return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");
std::optional<rapidjson::Document> payload;
if (action == "index" || action == "create") {
payload = rapidjson::Document(rapidjson::kObjectType);
utils::JsonInputCallback callback(*payload);
if (session.read(flow_file, std::ref(callback)) < 0) {
return nonstd::make_unexpected("invalid flowfile content");
}
}
if (action == "update" || action == "upsert") {
payload = rapidjson::Document(rapidjson::kObjectType);
rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
utils::JsonInputCallback callback(doc_member);
if (session.read(flow_file, std::ref(callback)) < 0) {
return nonstd::make_unexpected("invalid flowfile content");
}
if (action == "upsert") {
action = "update";
doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
}
payload->AddMember("doc", doc_member, payload->GetAllocator());
}
return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
}
private:
ElasticPayload(std::string operation,
std::string index,
std::optional<std::string> id,
std::optional<rapidjson::Document> payload) :
operation_(std::move(operation)),
index_(std::move(index)),
id_(std::move(id)),
payload_(std::move(payload)) {
}
[[nodiscard]] std::string headerString() const {
rapidjson::Document first_line = rapidjson::Document(rapidjson::kObjectType);
auto operation_index_key = rapidjson::Value(operation_.data(), gsl::narrow<rapidjson::SizeType>(operation_.size()));
first_line.AddMember(operation_index_key, rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
auto& operation_request = first_line[operation_.c_str()];
auto index_json = rapidjson::Value(index_.data(), gsl::narrow<rapidjson::SizeType>(index_.size()));
operation_request.AddMember("_index", index_json, first_line.GetAllocator());
if (id_) {
auto id_json = rapidjson::Value(id_->data(), gsl::narrow<rapidjson::SizeType>(id_->size()));
operation_request.AddMember("_id", id_json, first_line.GetAllocator());
}
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
first_line.Accept(writer);
return buffer.GetString();
}
std::string operation_;
std::string index_;
std::optional<std::string> id_;
std::optional<rapidjson::Document> payload_;
};
nonstd::expected<rapidjson::Document, std::string> submitRequest(http::HTTPClient& client, const std::string& payload, const size_t expected_items) {
client.setPostFields(payload);
if (!client.submit())
return nonstd::make_unexpected("Submit failed");
auto response_code = client.getResponseCode();
if (response_code != 200)
return nonstd::make_unexpected("Error occurred: " + std::to_string(response_code) + ", " + client.getResponseBody().data());
rapidjson::Document response;
rapidjson::ParseResult parse_result = response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
if (parse_result.IsError())
return nonstd::make_unexpected("Response is not valid json");
if (!response.HasMember("items"))
return nonstd::make_unexpected("Response is invalid");
if (response["items"].Size() != expected_items)
return nonstd::make_unexpected("The number of responses dont match the number of requests");
return response;
}
void addAttributesFromResponse(std::string name, rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
name = name + "." + object->name.GetString();
if (object->value.IsObject()) {
for (auto it = object->value.MemberBegin(); it != object->value.MemberEnd(); ++it) {
addAttributesFromResponse(name, it, flow_file);
}
} else if (object->value.IsInt64()) {
flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
} else if (object->value.IsString()) {
flow_file.addAttribute(name, object->value.GetString());
} else if (object->value.IsBool()) {
flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
} else if (object->value.IsDouble()) {
flow_file.addAttribute(name, std::to_string(object->value.GetDouble()));
} else {
core::logging::LoggerFactory<PostElasticsearch>::getLogger()->log_error("Unexpected {} in response json", magic_enum::enum_underlying(object->value.GetType()));
}
}
void processResponseFromElastic(const rapidjson::Document& response, core::ProcessSession& session, const std::vector<std::shared_ptr<core::FlowFile>>& flowfiles_sent) {
gsl_Expects(response.HasMember("items"));
auto& items = response["items"];
gsl_Expects(items.IsArray());
gsl_Expects(items.Size() == flowfiles_sent.size());
for (rapidjson::SizeType i = 0; i < items.Size(); ++i) {
gsl_Expects(items[i].IsObject());
for (auto it = items[i].MemberBegin(); it != items[i].MemberEnd(); ++it) {
addAttributesFromResponse("elasticsearch", it, *flowfiles_sent[i]);
}
if (items[i].MemberBegin()->value.HasMember("error"))
session.transfer(flowfiles_sent[i], PostElasticsearch::Error);
else
session.transfer(flowfiles_sent[i], PostElasticsearch::Success);
}
}
} // namespace
std::string PostElasticsearch::collectPayload(core::ProcessContext& context,
core::ProcessSession& session,
std::vector<std::shared_ptr<core::FlowFile>>& flowfiles_with_payload) const {
std::stringstream payload;
for (size_t flow_files_processed = 0; flow_files_processed < max_batch_size_; ++flow_files_processed) {
auto flow_file = session.get();
if (!flow_file)
break;
auto elastic_payload = ElasticPayload::parse(session, context, flow_file);
if (!elastic_payload) {
logger_->log_error("{}", elastic_payload.error());
session.transfer(flow_file, PostElasticsearch::Failure);
continue;
}
payload << elastic_payload->toString() << "\n";
flowfiles_with_payload.push_back(flow_file);
}
return payload.str();
}
void PostElasticsearch::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
gsl_Expects(max_batch_size_ > 0);
std::vector<std::shared_ptr<core::FlowFile>> flowfiles_with_payload;
auto payload = collectPayload(context, session, flowfiles_with_payload);
if (flowfiles_with_payload.empty()) {
return;
}
auto result = submitRequest(client_, payload, flowfiles_with_payload.size());
if (!result) {
logger_->log_error("{}", result.error());
for (const auto& flow_file_in_payload: flowfiles_with_payload)
session.transfer(flow_file_in_payload, Failure);
return;
}
processResponseFromElastic(*result, session, flowfiles_with_payload);
}
REGISTER_RESOURCE(PostElasticsearch, Processor);
} // namespace org::apache::nifi::minifi::extensions::elasticsearch