MINIFICPP-984 - Add "Hexencode Payload" and "Maximum Payload Line Length" options to LogAttribute with defaults to emulate the previous behaviour
This closes #625.
Signed-off-by: Marc Parisi <phrocker@apache.org>
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp
index cc44f25..1dbaf4a 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -56,6 +56,10 @@
core::Property LogAttribute::LogPayload(core::PropertyBuilder::createProperty("Log Payload")->withDescription("If true, the FlowFile's payload will be logged, in addition to its attributes."
"otherwise, just the Attributes will be logged")->withDefaultValue<bool>(false)->build());
+core::Property LogAttribute::HexencodePayload(core::PropertyBuilder::createProperty("Hexencode Payload")->withDescription("If true, the FlowFile's payload will be logged in a hexencoded format")->withDefaultValue<bool>(false)->build());
+
+core::Property LogAttribute::MaxPayloadLineLength(core::PropertyBuilder::createProperty("Maximum Payload Line Length")->withDescription("The logged payload will be broken into lines this long. 0 means no newlines will be added.")->withDefaultValue<uint32_t>(80U)->build());
+
core::Property LogAttribute::LogPrefix(
core::PropertyBuilder::createProperty("Log Prefix")->withDescription("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.")->build());
@@ -68,6 +72,8 @@
properties.insert(AttributesToLog);
properties.insert(AttributesToIgnore);
properties.insert(LogPayload);
+ properties.insert(HexencodePayload);
+ properties.insert(MaxPayloadLineLength);
properties.insert(FlowFilesToLog);
properties.insert(LogPrefix);
setSupportedProperties(properties);
@@ -87,6 +93,14 @@
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string());
flowfiles_to_log_ = flowsToLog.getValue();
}
+
+ std::string value;
+ if (context->getProperty(HexencodePayload.getName(), value)) {
+ utils::StringUtils::StringToBool(value, hexencode_);
+ }
+ if (context->getProperty(MaxPayloadLineLength.getName(), value)) {
+ core::Property::StringToInt(value, max_line_length_);
+ }
}
// OnTrigger method, implemented by NiFi LogAttribute
void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
@@ -138,9 +152,19 @@
ReadCallback callback(logger_, flow->getSize());
session->read(flow, &callback);
- auto payload_hex = utils::StringUtils::to_hex(callback.buffer_.data(), callback.buffer_.size());
- for (size_t i = 0; i < payload_hex.size(); i += 80) {
- message << payload_hex.substr(i, 80) << '\n';
+ std::string printable_payload;
+ if (hexencode_) {
+ printable_payload = utils::StringUtils::to_hex(callback.buffer_.data(), callback.buffer_.size());
+ } else {
+ printable_payload = std::string(reinterpret_cast<char*>(callback.buffer_.data()), callback.buffer_.size());
+ }
+
+ if (max_line_length_ == 0U) {
+ message << printable_payload << "\n";
+ } else {
+ for (size_t i = 0; i < printable_payload.size(); i += max_line_length_) {
+ message << printable_payload.substr(i, max_line_length_) << '\n';
+ }
}
} else {
message << "\n";
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index a8f00eb..99f53d4 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -43,6 +43,8 @@
LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
: Processor(name, uuid),
flowfiles_to_log_(1),
+ hexencode_(false),
+ max_line_length_(80U),
logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
}
// Destructor
@@ -55,6 +57,8 @@
static core::Property AttributesToLog;
static core::Property AttributesToIgnore;
static core::Property LogPayload;
+ static core::Property HexencodePayload;
+ static core::Property MaxPayloadLineLength;
static core::Property LogPrefix;
static core::Property FlowFilesToLog;
// Supported Relationships
@@ -119,6 +123,8 @@
private:
uint64_t flowfiles_to_log_;
+ bool hexencode_;
+ uint32_t max_line_length_;
// Logger
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/extensions/standard-processors/tests/integration/TailFileCronTest.cpp b/extensions/standard-processors/tests/integration/TailFileCronTest.cpp
index 589c92c..2b5ee77 100644
--- a/extensions/standard-processors/tests/integration/TailFileCronTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileCronTest.cpp
@@ -73,7 +73,7 @@
virtual void runAssertions() {
assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
- assert(LogTestController::getInstance().contains(utils::StringUtils::to_hex("li\\ne5")) == true);
+ assert(LogTestController::getInstance().contains("li\\ne5") == true);
}
protected:
diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp b/extensions/standard-processors/tests/integration/TailFileTest.cpp
index 589c92c..2b5ee77 100644
--- a/extensions/standard-processors/tests/integration/TailFileTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp
@@ -73,7 +73,7 @@
virtual void runAssertions() {
assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
- assert(LogTestController::getInstance().contains(utils::StringUtils::to_hex("li\\ne5")) == true);
+ assert(LogTestController::getInstance().contains("li\\ne5") == true);
}
protected:
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 416d3cf..217284f 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -411,17 +411,39 @@
std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
+ plan->setProperty(log_attr, processors::LogAttribute::HexencodePayload.getName(), "true");
+
+ uint32_t line_length = 0U;
+ SECTION("with line length 80") {
+ line_length = 80U;
+ }
+ SECTION("with line length 200") {
+ line_length = 200U;
+ plan->setProperty(log_attr, processors::LogAttribute::MaxPayloadLineLength.getName(), "200");
+ }
+ SECTION("with line length 0") {
+ line_length = 0U;
+ plan->setProperty(log_attr, processors::LogAttribute::MaxPayloadLineLength.getName(), "0");
+ }
+ SECTION("with line length 16") {
+ line_length = 16U;
+ plan->setProperty(log_attr, processors::LogAttribute::MaxPayloadLineLength.getName(), "16");
+ }
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files"));
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line1)));
auto line2_hex = utils::StringUtils::to_hex(line2);
- std::stringstream line2_hex_lines;
- for (size_t i = 0; i < line2_hex.size(); i += 80) {
- line2_hex_lines << line2_hex.substr(i, 80) << '\n';
+ if (line_length == 0U) {
+ REQUIRE(LogTestController::getInstance().contains(line2_hex));
+ } else {
+ std::stringstream line2_hex_lines;
+ for (size_t i = 0; i < line2_hex.size(); i += line_length) {
+ line2_hex_lines << line2_hex.substr(i, line_length) << '\n';
+ }
+ REQUIRE(LogTestController::getInstance().contains(line2_hex_lines.str()));
}
- REQUIRE(LogTestController::getInstance().contains(line2_hex_lines.str()));
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line3)));
REQUIRE(false == LogTestController::getInstance().contains(utils::StringUtils::to_hex(line4), std::chrono::seconds(0)));
@@ -470,6 +492,7 @@
std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
+ plan->setProperty(log_attr, processors::LogAttribute::HexencodePayload.getName(), "true");
testController.runSession(plan, true);