blob: bc33be6fc766a925294eec76ca0c359c046e5211 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "QuerySplunkIndexingStatus.h"
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include "SplunkAttributes.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "client/HTTPClient.h"
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
namespace org::apache::nifi::minifi::extensions::splunk {
namespace {
constexpr std::string_view getEndpoint() {
return "/services/collector/ack";
}
struct FlowFileWithIndexStatus {
explicit FlowFileWithIndexStatus(gsl::not_null<std::shared_ptr<core::FlowFile>>&& flow_file) : flow_file_(std::move(flow_file)) {}
gsl::not_null<std::shared_ptr<core::FlowFile>> flow_file_;
std::optional<bool> indexing_status_ = std::nullopt;
};
std::unordered_map<uint64_t, FlowFileWithIndexStatus> getUndeterminedFlowFiles(core::ProcessSession& session, size_t batch_size) {
std::unordered_map<uint64_t, FlowFileWithIndexStatus> undetermined_flow_files;
std::unordered_set<uint64_t> duplicate_ack_ids;
for (size_t i = 0; i < batch_size; ++i) {
auto flow = session.get();
if (flow == nullptr)
break;
std::optional<std::string> splunk_ack_id_str = flow->getAttribute(SPLUNK_ACK_ID);
if (!splunk_ack_id_str.has_value()) {
session.transfer(flow, QuerySplunkIndexingStatus::Failure);
continue;
}
uint64_t splunk_ack_id = std::stoull(splunk_ack_id_str.value());
if (undetermined_flow_files.contains(splunk_ack_id)) {
duplicate_ack_ids.insert(splunk_ack_id);
session.transfer(flow, QuerySplunkIndexingStatus::Failure);
continue;
}
undetermined_flow_files.emplace(std::make_pair(splunk_ack_id, gsl::not_null(std::move(flow))));
}
for (auto duplicate_ack_id : duplicate_ack_ids) {
session.transfer(undetermined_flow_files.at(duplicate_ack_id).flow_file_, QuerySplunkIndexingStatus::Failure);
undetermined_flow_files.erase(duplicate_ack_id);
}
return undetermined_flow_files;
}
std::string getAckIdsAsPayload(const std::unordered_map<uint64_t, FlowFileWithIndexStatus>& undetermined_flow_files) {
rapidjson::Document payload = rapidjson::Document(rapidjson::kObjectType);
payload.AddMember("acks", rapidjson::kArrayType, payload.GetAllocator());
for (const auto& [ack_id, ff_status] : undetermined_flow_files) {
payload["acks"].PushBack(ack_id, payload.GetAllocator());
}
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
payload.Accept(writer);
return buffer.GetString();
}
void getIndexingStatusFromSplunk(curl::HTTPClient& client, std::unordered_map<uint64_t, FlowFileWithIndexStatus>& undetermined_flow_files) {
rapidjson::Document response;
if (!client.submit())
return;
if (client.getResponseCode() != 200)
return;
response = rapidjson::Document();
rapidjson::ParseResult parse_result = response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
if (parse_result.IsError() || !response.HasMember("acks"))
return;
rapidjson::Value& acks = response["acks"];
for (auto& [ack_id, ff_status]: undetermined_flow_files) {
if (acks.HasMember(std::to_string(ack_id).c_str()) && acks[std::to_string(ack_id).c_str()].IsBool())
ff_status.indexing_status_ = acks[std::to_string(ack_id).c_str()].GetBool();
}
}
bool flowFileAcknowledgementTimedOut(const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file, std::chrono::milliseconds max_age) {
using std::chrono::system_clock;
using std::chrono::milliseconds;
std::optional<std::string> splunk_response_time_str = flow_file->getAttribute(SPLUNK_RESPONSE_TIME);
if (!splunk_response_time_str.has_value())
return true;
uint64_t splunk_response_time = std::stoull(splunk_response_time_str.value());
return system_clock::now() > std::chrono::system_clock::time_point() + std::chrono::milliseconds(splunk_response_time) + max_age;
}
void routeFlowFilesBasedOnIndexingStatus(core::ProcessSession& session,
const std::unordered_map<uint64_t, FlowFileWithIndexStatus>& flow_files_with_index_statuses,
std::chrono::milliseconds max_age) {
for (const auto& [ack_id, ff_status] : flow_files_with_index_statuses) {
if (!ff_status.indexing_status_.has_value()) {
session.transfer(ff_status.flow_file_, QuerySplunkIndexingStatus::Failure);
} else {
if (ff_status.indexing_status_.value()) {
session.transfer(ff_status.flow_file_, QuerySplunkIndexingStatus::Acknowledged);
} else if (flowFileAcknowledgementTimedOut(ff_status.flow_file_, max_age)) {
session.transfer(ff_status.flow_file_, QuerySplunkIndexingStatus::Unacknowledged);
} else {
session.penalize(ff_status.flow_file_);
session.transfer(ff_status.flow_file_, QuerySplunkIndexingStatus::Undetermined);
}
}
}
}
} // namespace
void QuerySplunkIndexingStatus::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void QuerySplunkIndexingStatus::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
gsl_Expects(context && sessionFactory);
SplunkHECProcessor::onSchedule(context, sessionFactory);
std::string max_wait_time_str;
if (auto max_age = context->getProperty<core::TimePeriodValue>(MaximumWaitingTime)) {
max_age_ = max_age->getMilliseconds();
}
context->getProperty(MaxQuerySize, batch_size_);
initializeClient(client_, getNetworkLocation().append(getEndpoint()), getSSLContextService(*context));
}
void QuerySplunkIndexingStatus::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
gsl_Expects(context && session);
std::string ack_request;
auto undetermined_flow_files = getUndeterminedFlowFiles(*session, batch_size_);
if (undetermined_flow_files.empty())
return;
client_.setPostFields(getAckIdsAsPayload(undetermined_flow_files));
getIndexingStatusFromSplunk(client_, undetermined_flow_files);
routeFlowFilesBasedOnIndexingStatus(*session, undetermined_flow_files, max_age_);
}
REGISTER_RESOURCE(QuerySplunkIndexingStatus, Processor);
} // namespace org::apache::nifi::minifi::extensions::splunk