blob: f328078ac7c2dc4dd49f79a4f03cbcbf305a90d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PushGrafanaLoki.h"
#include <utility>
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "utils/ProcessorConfigUtils.h"
#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::extensions::grafana::loki {
void PushGrafanaLoki::LogBatch::add(const std::shared_ptr<core::FlowFile>& flowfile) {
gsl_Expects(state_manager_);
if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
start_push_time_ = std::chrono::system_clock::now();
std::unordered_map<std::string, std::string> state;
state["start_push_time"] = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count());
logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]);
state_manager_->set(state);
}
batched_flowfiles_.push_back(flowfile);
}
void PushGrafanaLoki::LogBatch::restore(const std::shared_ptr<core::FlowFile>& flowfile) {
batched_flowfiles_.push_back(flowfile);
}
std::vector<std::shared_ptr<core::FlowFile>> PushGrafanaLoki::LogBatch::flush() {
gsl_Expects(state_manager_);
start_push_time_ = {};
auto result = batched_flowfiles_;
batched_flowfiles_.clear();
if (log_line_batch_wait_) {
std::unordered_map<std::string, std::string> state;
logger_->log_debug("Reset start push time state");
state["start_push_time"] = "0";
state_manager_->set(state);
}
return result;
}
bool PushGrafanaLoki::LogBatch::isReady() const {
return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::system_clock::now() - start_push_time_ >= *log_line_batch_wait_);
}
void PushGrafanaLoki::LogBatch::setLogLineBatchSize(std::optional<uint64_t> log_line_batch_size) {
log_line_batch_size_ = log_line_batch_size;
}
void PushGrafanaLoki::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds> log_line_batch_wait) {
log_line_batch_wait_ = log_line_batch_wait;
}
void PushGrafanaLoki::LogBatch::setStateManager(core::StateManager* state_manager) {
state_manager_ = state_manager;
}
void PushGrafanaLoki::LogBatch::setStartPushTime(std::chrono::system_clock::time_point start_push_time) {
start_push_time_ = start_push_time;
}
const core::Relationship PushGrafanaLoki::Self("__self__", "Marks the FlowFile to be owned by this processor");
std::shared_ptr<minifi::controllers::SSLContextServiceInterface> PushGrafanaLoki::getSSLContextService(core::ProcessContext& context) const {
if (auto ssl_context = context.getProperty(PushGrafanaLoki::SSLContextService)) {
return std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*ssl_context, getUUID()));
}
return std::shared_ptr<minifi::controllers::SSLContextServiceInterface>{};
}
void PushGrafanaLoki::setUpStateManager(core::ProcessContext& context) {
auto state_manager = context.getStateManager();
if (state_manager == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
log_batch_.setStateManager(state_manager);
std::unordered_map<std::string, std::string> state_map;
if (state_manager->get(state_map)) {
auto it = state_map.find("start_push_time");
if (it != state_map.end()) {
logger_->log_info("Restored start push time from processor state: {}", it->second);
std::chrono::system_clock::time_point start_push_time{std::chrono::milliseconds{std::stoll(it->second)}};
log_batch_.setStartPushTime(start_push_time);
}
}
}
std::map<std::string, std::string> PushGrafanaLoki::buildStreamLabelMap(core::ProcessContext& context) {
std::map<std::string, std::string> stream_label_map;
if (auto stream_labels_str = context.getProperty(StreamLabels)) {
auto stream_labels = utils::string::splitAndTrimRemovingEmpty(*stream_labels_str, ",");
if (stream_labels.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Labels property");
}
for (const auto& label : stream_labels) {
auto stream_labels = utils::string::splitAndTrimRemovingEmpty(label, "=");
if (stream_labels.size() != 2) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Labels property");
}
stream_label_map[stream_labels[0]] = stream_labels[1];
}
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Labels property");
}
return stream_label_map;
}
void PushGrafanaLoki::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
setUpStateManager(context);
setUpStreamLabels(context);
if (auto log_line_metadata_attributes = context.getProperty(LogLineMetadataAttributes)) {
log_line_metadata_attributes_ = utils::string::splitAndTrimRemovingEmpty(*log_line_metadata_attributes, ",");
}
auto log_line_batch_wait = utils::parseOptionalDurationProperty(context, LogLineBatchWait);
auto log_line_batch_size = utils::parseOptionalU64Property(context, LogLineBatchSize);
if (log_line_batch_size && *log_line_batch_size < 1) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Log Line Batch Size property is missing or less than 1!");
}
log_line_batch_size_is_set_ = log_line_batch_size.has_value();
log_line_batch_wait_is_set_ = log_line_batch_wait.has_value();
max_batch_size_ = utils::parseOptionalU64Property(context, MaxBatchSize);
if (max_batch_size_) {
logger_->log_debug("PushGrafanaLoki Max Batch Size is set to: {}", *max_batch_size_);
}
log_batch_.setLogLineBatchSize(log_line_batch_size);
if (log_line_batch_size) {
logger_->log_debug("PushGrafanaLoki Log Line Batch Size is set to: {}", *log_line_batch_size);
}
if (log_line_batch_wait) {
log_batch_.setLogLineBatchWait(*log_line_batch_wait);
logger_->log_debug("PushGrafanaLoki Log Line Batch Wait is set to {} milliseconds", *log_line_batch_wait);
}
}
void PushGrafanaLoki::processBatch(const std::vector<std::shared_ptr<core::FlowFile>>& batched_flow_files, core::ProcessSession& session) {
if (batched_flow_files.empty()) {
return;
}
auto result = submitRequest(batched_flow_files, session);
if (!result) {
logger_->log_error("Failed to send log batch to Loki: {}", result.error());
for (const auto& flow_file : batched_flow_files) {
session.transfer(flow_file, Failure);
}
} else {
logger_->log_debug("Successfully sent log batch with {} log lines to Loki", batched_flow_files.size());
for (const auto& flow_file : batched_flow_files) {
session.transfer(flow_file, Success);
}
}
}
void PushGrafanaLoki::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
uint64_t flow_files_read = 0;
std::vector<std::shared_ptr<core::FlowFile>> to_be_transferred_flow_files;
while (!max_batch_size_ || *max_batch_size_ == 0 || flow_files_read < *max_batch_size_) {
std::shared_ptr<core::FlowFile> flow_file = session.get();
if (!flow_file) {
break;
}
to_be_transferred_flow_files.push_back(flow_file);
logger_->log_debug("Enqueuing flow file {} to be sent to Loki", flow_file->getUUIDStr());
log_batch_.add(flow_file);
if (log_batch_.isReady()) { // if no log line batch limit is set, then log batch will never be ready
auto batched_flow_files = log_batch_.flush();
for (const auto& flow : batched_flow_files) {
if (to_be_transferred_flow_files[0] == flow) { // we don't want to add the flowfiles that are already in this session
break;
}
session.add(flow);
}
logger_->log_debug("Sending {} log lines to Loki", batched_flow_files.size());
processBatch(batched_flow_files, session);
to_be_transferred_flow_files.clear();
}
++flow_files_read;
}
if (!log_line_batch_size_is_set_ && !log_line_batch_wait_is_set_) { // if no log line batch limit is set, then the log batch will contain all the flow files in the trigger that should be sent
auto batched_flow_files = log_batch_.flush();
logger_->log_debug("Sending {} log lines to Loki", batched_flow_files.size());
processBatch(batched_flow_files, session);
} else if (flow_files_read == 0 && log_line_batch_wait_is_set_) { // if no flow files were read, but wait time is set for log batch, we should see if it is ready to be sent
if (!log_batch_.isReady()) {
return;
}
auto batched_flow_files = log_batch_.flush();
for (const auto& flow : batched_flow_files) {
session.add(flow);
}
logger_->log_debug("Sending {} log lines to Loki", batched_flow_files.size());
processBatch(batched_flow_files, session);
} else if (flow_files_read == 0) { // if no flow files were read and no log batch wait time is set, then we should yield
context.yield();
} else { // if flow files were read, then assume ownership of incoming, non-transferred flow files
for (const auto& flow_file : to_be_transferred_flow_files) {
session.transfer(flow_file, Self);
}
}
}
void PushGrafanaLoki::restore(const std::shared_ptr<core::FlowFile>& flow_file) {
if (!flow_file) {
return;
}
logger_->log_debug("Restoring flow file {} from flow file repository", flow_file->getUUIDStr());
log_batch_.restore(flow_file);
}
} // namespace org::apache::nifi::minifi::extensions::grafana::loki