blob: 66cd7c040a1f117f7aec91b112e5d4237234e83c [file] [log] [blame]
/**
* @file LogAttribute.cpp
* LogAttribute class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "LogAttribute.h"
#include <cstring>
#include <memory>
#include <string>
#include <vector>
#include <queue>
#include <map>
#include <sstream>
#include <iostream>
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
void LogAttribute::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void LogAttribute::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
flowfiles_to_log_ = utils::parseU64Property(context, FlowFilesToLog);
logger_->log_debug("FlowFiles To Log: {}", flowfiles_to_log_);
hexencode_ = utils::parseBoolProperty(context, HexencodePayload);
max_line_length_ = utils::parseU64Property(context, MaxPayloadLineLength);
logger_->log_debug("Maximum Payload Line Length: {}", max_line_length_);
if (auto attributes_to_log_str = context.getProperty(AttributesToLog)) {
if (auto attrs_to_log_vec = utils::string::split(*attributes_to_log_str, ","); !attrs_to_log_vec.empty())
attributes_to_log_.emplace(std::make_move_iterator(attrs_to_log_vec.begin()), std::make_move_iterator(attrs_to_log_vec.end()));
}
if (auto attributes_to_ignore_str = context.getProperty(AttributesToIgnore)) {
if (auto attrs_to_ignore_vec = utils::string::split(*attributes_to_ignore_str, ","); !attrs_to_ignore_vec.empty())
attributes_to_ignore_.emplace(std::make_move_iterator(attrs_to_ignore_vec.begin()), std::make_move_iterator(attrs_to_ignore_vec.end()));
}
if (auto log_level_str = context.getProperty(LogLevel)) {
if (auto result = magic_enum::enum_cast<core::logging::LOG_LEVEL>(*log_level_str)) {
log_level_ = *result;
} else if (*log_level_str == "error") { // TODO(MINIFICPP-2294) this could be avoided if config files were properly migrated
log_level_ = core::logging::err;
}
}
if (auto log_prefix = context.getProperty(LogPrefix)) {
dash_line_ = fmt::format("{:-^50}", *log_prefix);
}
log_payload_ = utils::parseBoolProperty(context, LogPayload);
}
std::string LogAttribute::generateLogMessage(core::ProcessSession& session, const std::shared_ptr<core::FlowFile>& flow_file) const {
std::ostringstream message;
message << "Logging for flow file" << "\n";
message << dash_line_;
message << "\nStandard FlowFile Attributes";
message << "\n" << "UUID:" << flow_file->getUUIDStr();
message << "\n" << "EntryDate:" << utils::timeutils::getTimeStr(flow_file->getEntryDate());
message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow_file->getlineageStartDate());
message << "\n" << "Size:" << flow_file->getSize() << " Offset:" << flow_file->getOffset();
message << "\nFlowFile Attributes Map Content";
for (const auto& [attr_key, attr_value] : flow_file->getAttributes()) {
if (attributes_to_ignore_ && attributes_to_ignore_->contains(attr_key))
continue;
if (attributes_to_log_ && !attributes_to_log_->contains(attr_key))
continue;
message << "\n" << "key:" << attr_key << " value:" << attr_value;
}
message << "\nFlowFile Resource Claim Content";
if (const auto claim = flow_file->getResourceClaim()) {
message << "\n" << "Content Claim:" << claim->getContentFullPath();
}
if (log_payload_ && flow_file->getSize() <= 1024 * 1024) {
message << "\n" << "Payload:" << "\n";
const auto read_result = session.readBuffer(flow_file);
std::string printable_payload;
if (hexencode_) {
printable_payload = utils::string::to_hex(read_result.buffer);
} else {
printable_payload = to_string(read_result);
}
if (max_line_length_ == 0U) {
message << printable_payload << "\n";
} else {
for (size_t j = 0; j < printable_payload.size(); j += max_line_length_) {
message << printable_payload.substr(j, max_line_length_) << '\n';
}
}
} else {
message << "\n";
}
message << dash_line_;
return message.str();
}
void LogAttribute::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
logger_->log_trace("enter log attribute, attempting to retrieve {} flow files", flowfiles_to_log_);
const auto max_flow_files_to_process = flowfiles_to_log_ == 0 ? UINT64_MAX : flowfiles_to_log_;
uint64_t flow_files_processed = 0;
for (; flow_files_processed < max_flow_files_to_process; ++flow_files_processed) {
std::shared_ptr<core::FlowFile> flow = session.get();
if (!flow) {
break;
}
logger_->log_with_level(log_level_, "{}", generateLogMessage(session, flow));
session.transfer(flow, Success);
}
logger_->log_debug("Logged {} flow files", flow_files_processed);
}
REGISTER_RESOURCE(LogAttribute, Processor);
} // namespace org::apache::nifi::minifi::processors