blob: 3f77141e6a57f19905e14ac4a9046ba3a197fc10 [file] [log] [blame]
/**
* @file LogAttribute.cpp
* LogAttribute class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "LogAttribute.h"
#include <time.h>
#include <string.h>
#include <memory>
#include <string>
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sstream>
#include <iostream>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property LogAttribute::LogLevel(core::PropertyBuilder::createProperty("Log Level")->withDescription("The Log Level to use when logging the Attributes")->withAllowableValues<std::string>(
{"info", "trace", "error", "warn", "debug" })->build());
core::Property LogAttribute::AttributesToLog(
core::PropertyBuilder::createProperty("Attributes to Log")->withDescription("A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.")->build());
core::Property LogAttribute::FlowFilesToLog(
core::PropertyBuilder::createProperty("FlowFiles To Log")->withDescription(
"Number of flow files to log. If set to zero all flow files will be logged. Please note that this may block other threads from running if not used judiciously.")->withDefaultValue<uint64_t>(1)
->build());
core::Property LogAttribute::AttributesToIgnore(
core::PropertyBuilder::createProperty("Attributes to Ignore")->withDescription("A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.")->build());
core::Property LogAttribute::LogPayload(core::PropertyBuilder::createProperty("Log Payload")->withDescription("If true, the FlowFile's payload will be logged, in addition to its attributes."
"otherwise, just the Attributes will be logged")->withDefaultValue<bool>(false)->build());
core::Property LogAttribute::HexencodePayload(
core::PropertyBuilder::createProperty("Hexencode Payload")->withDescription(
"If true, the FlowFile's payload will be logged in a hexencoded format")->withDefaultValue<bool>(false)->build());
core::Property LogAttribute::MaxPayloadLineLength(
core::PropertyBuilder::createProperty("Maximum Payload Line Length")->withDescription(
"The logged payload will be broken into lines this long. 0 means no newlines will be added.")->withDefaultValue<uint32_t>(80U)->build());
core::Property LogAttribute::LogPrefix(
core::PropertyBuilder::createProperty("Log Prefix")->withDescription("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.")->build());
core::Relationship LogAttribute::Success("success", "success operational on the flow record");
void LogAttribute::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(LogLevel);
properties.insert(AttributesToLog);
properties.insert(AttributesToIgnore);
properties.insert(LogPayload);
properties.insert(HexencodePayload);
properties.insert(MaxPayloadLineLength);
properties.insert(FlowFilesToLog);
properties.insert(LogPrefix);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void LogAttribute::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
context->getProperty(FlowFilesToLog.getName(), flowfiles_to_log_);
logger_->log_debug("FlowFiles To Log: %llu", flowfiles_to_log_);
context->getProperty(HexencodePayload.getName(), hexencode_);
context->getProperty(MaxPayloadLineLength.getName(), max_line_length_);
logger_->log_debug("Maximum Payload Line Length: %u", max_line_length_);
}
// OnTrigger method, implemented by NiFi LogAttribute
void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
logger_->log_trace("enter log attribute, attempting to retrieve %u flow files", flowfiles_to_log_);
std::string dashLine = "--------------------------------------------------";
LogAttrLevel level = LogAttrLevelInfo;
bool logPayload = false;
uint64_t i = 0;
const auto max = flowfiles_to_log_ == 0 ? UINT64_MAX : flowfiles_to_log_;
for (; i < max; ++i) {
std::shared_ptr<core::FlowFile> flow = session->get();
if (!flow) {
break;
}
std::string value;
if (context->getProperty(LogLevel.getName(), value)) {
logLevelStringToEnum(value, level);
}
if (context->getProperty(LogPrefix.getName(), value)) {
dashLine = "-----" + value + "-----";
}
context->getProperty(LogPayload.getName(), logPayload);
std::ostringstream message;
message << "Logging for flow file " << "\n";
message << dashLine;
message << "\nStandard FlowFile Attributes";
message << "\n" << "UUID:" << flow->getUUIDStr();
message << "\n" << "EntryDate:" << utils::timeutils::getTimeStr(flow->getEntryDate());
message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow->getlineageStartDate());
message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset();
message << "\nFlowFile Attributes Map Content";
std::map<std::string, std::string> attrs = flow->getAttributes();
std::map<std::string, std::string>::iterator it;
for (it = attrs.begin(); it != attrs.end(); it++) {
message << "\n" << "key:" << it->first << " value:" << it->second;
}
message << "\nFlowFile Resource Claim Content";
std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
if (claim) {
message << "\n" << "Content Claim:" << claim->getContentFullPath();
}
if (logPayload && flow->getSize() <= 1024 * 1024) {
message << "\n" << "Payload:" << "\n";
ReadCallback callback(logger_, gsl::narrow<size_t>(flow->getSize()));
session->read(flow, &callback);
std::string printable_payload;
if (hexencode_) {
printable_payload = utils::StringUtils::to_hex(callback.buffer_.data(), callback.buffer_.size());
} else {
printable_payload = std::string(reinterpret_cast<char*>(callback.buffer_.data()), callback.buffer_.size());
}
if (max_line_length_ == 0U) {
message << printable_payload << "\n";
} else {
for (size_t i = 0; i < printable_payload.size(); i += max_line_length_) {
message << printable_payload.substr(i, max_line_length_) << '\n';
}
}
} else {
message << "\n";
}
message << dashLine;
std::string output = message.str();
switch (level) {
case LogAttrLevelInfo:
logging::LOG_INFO(logger_) << output;
break;
case LogAttrLevelDebug:
logging::LOG_DEBUG(logger_) << output;
break;
case LogAttrLevelError:
logging::LOG_ERROR(logger_) << output;
break;
case LogAttrLevelTrace:
logging::LOG_TRACE(logger_) << output;
break;
case LogAttrLevelWarn:
logging::LOG_WARN(logger_) << output;
break;
default:
break;
}
session->transfer(flow, Success);
}
logger_->log_debug("Logged %d flow files", i);
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */