MINIFICPP-984 - Add "Hexencode Payload" and "Maximum Payload Line Length" options to LogAttribute with defaults to emulate the previous behaviour

This closes #625.

Signed-off-by: Marc Parisi <phrocker@apache.org>
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp
index cc44f25..1dbaf4a 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -56,6 +56,10 @@
 core::Property LogAttribute::LogPayload(core::PropertyBuilder::createProperty("Log Payload")->withDescription("If true, the FlowFile's payload will be logged, in addition to its attributes."
                                                                                                               "otherwise, just the Attributes will be logged")->withDefaultValue<bool>(false)->build());
 
+core::Property LogAttribute::HexencodePayload(core::PropertyBuilder::createProperty("Hexencode Payload")->withDescription("If true, the FlowFile's payload will be logged in a hexencoded format")->withDefaultValue<bool>(false)->build());
+
+core::Property LogAttribute::MaxPayloadLineLength(core::PropertyBuilder::createProperty("Maximum Payload Line Length")->withDescription("The logged payload will be broken into lines this long. 0 means no newlines will be added.")->withDefaultValue<uint32_t>(80U)->build());
+
 core::Property LogAttribute::LogPrefix(
     core::PropertyBuilder::createProperty("Log Prefix")->withDescription("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.")->build());
 
@@ -68,6 +72,8 @@
   properties.insert(AttributesToLog);
   properties.insert(AttributesToIgnore);
   properties.insert(LogPayload);
+  properties.insert(HexencodePayload);
+  properties.insert(MaxPayloadLineLength);
   properties.insert(FlowFilesToLog);
   properties.insert(LogPrefix);
   setSupportedProperties(properties);
@@ -87,6 +93,14 @@
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for flowfiles to log: " + flowsToLog.getValue().to_string());
     flowfiles_to_log_ = flowsToLog.getValue();
   }
+
+  std::string value;
+  if (context->getProperty(HexencodePayload.getName(), value)) {
+    utils::StringUtils::StringToBool(value, hexencode_);
+  }
+  if (context->getProperty(MaxPayloadLineLength.getName(), value)) {
+    core::Property::StringToInt(value, max_line_length_);
+  }
 }
 // OnTrigger method, implemented by NiFi LogAttribute
 void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
@@ -138,9 +152,19 @@
       ReadCallback callback(logger_, flow->getSize());
       session->read(flow, &callback);
 
-      auto payload_hex = utils::StringUtils::to_hex(callback.buffer_.data(), callback.buffer_.size());
-      for (size_t i = 0; i < payload_hex.size(); i += 80) {
-        message << payload_hex.substr(i, 80) << '\n';
+      std::string printable_payload;
+      if (hexencode_) {
+        printable_payload = utils::StringUtils::to_hex(callback.buffer_.data(), callback.buffer_.size());
+      } else {
+        printable_payload = std::string(reinterpret_cast<char*>(callback.buffer_.data()), callback.buffer_.size());
+      }
+
+      if (max_line_length_ == 0U) {
+        message << printable_payload << "\n";
+      } else {
+        for (size_t i = 0; i < printable_payload.size(); i += max_line_length_) {
+          message << printable_payload.substr(i, max_line_length_) << '\n';
+        }
       }
     } else {
       message << "\n";
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index a8f00eb..99f53d4 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -43,6 +43,8 @@
   LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier())
       : Processor(name, uuid),
         flowfiles_to_log_(1),
+        hexencode_(false),
+        max_line_length_(80U),
         logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
   }
   // Destructor
@@ -55,6 +57,8 @@
   static core::Property AttributesToLog;
   static core::Property AttributesToIgnore;
   static core::Property LogPayload;
+  static core::Property HexencodePayload;
+  static core::Property MaxPayloadLineLength;
   static core::Property LogPrefix;
   static core::Property FlowFilesToLog;
   // Supported Relationships
@@ -119,6 +123,8 @@
 
  private:
   uint64_t flowfiles_to_log_;
+  bool hexencode_;
+  uint32_t max_line_length_;
   // Logger
   std::shared_ptr<logging::Logger> logger_;
 };
diff --git a/extensions/standard-processors/tests/integration/TailFileCronTest.cpp b/extensions/standard-processors/tests/integration/TailFileCronTest.cpp
index 589c92c..2b5ee77 100644
--- a/extensions/standard-processors/tests/integration/TailFileCronTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileCronTest.cpp
@@ -73,7 +73,7 @@
   virtual void runAssertions() {
     assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
     assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
-    assert(LogTestController::getInstance().contains(utils::StringUtils::to_hex("li\\ne5")) == true);
+    assert(LogTestController::getInstance().contains("li\\ne5") == true);
   }
 
  protected:
diff --git a/extensions/standard-processors/tests/integration/TailFileTest.cpp b/extensions/standard-processors/tests/integration/TailFileTest.cpp
index 589c92c..2b5ee77 100644
--- a/extensions/standard-processors/tests/integration/TailFileTest.cpp
+++ b/extensions/standard-processors/tests/integration/TailFileTest.cpp
@@ -73,7 +73,7 @@
   virtual void runAssertions() {
     assert(LogTestController::getInstance().contains("5 flowfiles were received from TailFile input") == true);
     assert(LogTestController::getInstance().contains("Looking for delimiter 0xA") == true);
-    assert(LogTestController::getInstance().contains(utils::StringUtils::to_hex("li\\ne5")) == true);
+    assert(LogTestController::getInstance().contains("li\\ne5") == true);
   }
 
  protected:
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 416d3cf..217284f 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -411,17 +411,39 @@
   std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
   plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
   plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
+  plan->setProperty(log_attr, processors::LogAttribute::HexencodePayload.getName(), "true");
+
+  uint32_t line_length = 0U;
+  SECTION("with line length 80") {
+    line_length = 80U;
+  }
+  SECTION("with line length 200") {
+    line_length = 200U;
+    plan->setProperty(log_attr, processors::LogAttribute::MaxPayloadLineLength.getName(), "200");
+  }
+  SECTION("with line length 0") {
+    line_length = 0U;
+    plan->setProperty(log_attr, processors::LogAttribute::MaxPayloadLineLength.getName(), "0");
+  }
+  SECTION("with line length 16") {
+    line_length = 16U;
+    plan->setProperty(log_attr, processors::LogAttribute::MaxPayloadLineLength.getName(), "16");
+  }
 
   testController.runSession(plan, true);
 
   REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files"));
   REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line1)));
   auto line2_hex = utils::StringUtils::to_hex(line2);
-  std::stringstream line2_hex_lines;
-  for (size_t i = 0; i < line2_hex.size(); i += 80) {
-    line2_hex_lines << line2_hex.substr(i, 80) << '\n';
+  if (line_length == 0U) {
+    REQUIRE(LogTestController::getInstance().contains(line2_hex));
+  } else {
+    std::stringstream line2_hex_lines;
+    for (size_t i = 0; i < line2_hex.size(); i += line_length) {
+      line2_hex_lines << line2_hex.substr(i, line_length) << '\n';
+    }
+    REQUIRE(LogTestController::getInstance().contains(line2_hex_lines.str()));
   }
-  REQUIRE(LogTestController::getInstance().contains(line2_hex_lines.str()));
   REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line3)));
   REQUIRE(false == LogTestController::getInstance().contains(utils::StringUtils::to_hex(line4), std::chrono::seconds(0)));
 
@@ -470,6 +492,7 @@
   std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
   plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
   plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
+  plan->setProperty(log_attr, processors::LogAttribute::HexencodePayload.getName(), "true");
 
   testController.runSession(plan, true);