MINIFICPP-1326 improve PublishKafka logging

..., move implementation details to the .cpp file, and
other small refactors and improvements

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #868
diff --git a/Windows.md b/Windows.md
index d453955..c2400be 100644
--- a/Windows.md
+++ b/Windows.md
@@ -62,6 +62,7 @@
 | /J | Enables JNI |
 | /64 | Creates 64-bit build instead of a 32-bit one |
 | /D | Builds RelWithDebInfo build instead of Release |
+| /DD | Builds Debug build instead of Release |
 | /CI | Sets STRICT_GSL_CHECKS to AUDIT |
 
 Examples:
diff --git a/extensions/librdkafka/KafkaConnection.cpp b/extensions/librdkafka/KafkaConnection.cpp
index cf871a8..3d412ad 100644
--- a/extensions/librdkafka/KafkaConnection.cpp
+++ b/extensions/librdkafka/KafkaConnection.cpp
@@ -87,7 +87,7 @@
   return nullptr;
 }
 
-KafkaConnectionKey const * const KafkaConnection::getKey() const {
+KafkaConnectionKey const* KafkaConnection::getKey() const {
   return &key_;
 }
 
diff --git a/extensions/librdkafka/KafkaConnection.h b/extensions/librdkafka/KafkaConnection.h
index 816b40a..8d5b12e 100644
--- a/extensions/librdkafka/KafkaConnection.h
+++ b/extensions/librdkafka/KafkaConnection.h
@@ -37,14 +37,16 @@
 namespace minifi {
 namespace processors {
 
-class KafkaConnectionKey {
- public:
-    std::string client_id_;
-    std::string brokers_;
+struct KafkaConnectionKey {
+  std::string client_id_;
+  std::string brokers_;
 
-    bool operator <(const KafkaConnectionKey& rhs) const {
-      return std::tie(brokers_, client_id_) < std::tie(rhs.brokers_, rhs.client_id_);
-    }
+  bool operator< (const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) <  std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator<=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) <= std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator==(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) == std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator!=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) != std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator> (const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) >  std::tie(rhs.brokers_, rhs.client_id_); }
+  bool operator>=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) >= std::tie(rhs.brokers_, rhs.client_id_); }
 };
 
 class KafkaConnection {
@@ -70,7 +72,7 @@
 
   std::shared_ptr<KafkaTopic> getTopic(const std::string &topic) const;
 
-  KafkaConnectionKey const * const getKey() const;
+  KafkaConnectionKey const* getKey() const;
 
   void putTopic(const std::string &topicName, const std::shared_ptr<KafkaTopic> &topic);
 
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index fc5ee56..1b92edc 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -39,6 +39,21 @@
 namespace minifi {
 namespace processors {
 
+#define COMPRESSION_CODEC_NONE "none"
+#define COMPRESSION_CODEC_GZIP "gzip"
+#define COMPRESSION_CODEC_SNAPPY "snappy"
+#define ROUND_ROBIN_PARTITIONING "Round Robin"
+#define RANDOM_PARTITIONING "Random Robin"
+#define USER_DEFINED_PARTITIONING "User-Defined"
+#define DELIVERY_REPLICATED "all"
+#define DELIVERY_ONE_NODE "1"
+#define DELIVERY_BEST_EFFORT "0"
+#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
+#define SECURITY_PROTOCOL_SSL "ssl"
+#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext"
+#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
+#define KAFKA_KEY_ATTRIBUTE "kafka.key"
+
 const core::Property PublishKafka::SeedBrokers(
     core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
@@ -144,6 +159,305 @@
 struct rd_kafka_topic_conf_deleter {
   void operator()(rd_kafka_topic_conf_t* p) const noexcept { rd_kafka_topic_conf_destroy(p); }
 };
+
+// Message
+enum class MessageStatus : uint8_t {
+  InFlight,
+  Error,
+  Success
+};
+
+const char* to_string(const MessageStatus s) {
+  switch (s) {
+    case MessageStatus::InFlight: return "InFlight";
+    case MessageStatus::Error: return "Error";
+    case MessageStatus::Success: return "Success";
+  }
+  throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable code"};
+}
+
+struct MessageResult {
+  MessageStatus status = MessageStatus::InFlight;
+  rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR;
+};
+
+struct FlowFileResult {
+  bool flow_file_error = false;
+  std::vector<MessageResult> messages;
+};
+}  // namespace
+
+class PublishKafka::Messages {
+  std::mutex mutex_;
+  std::condition_variable cv_;
+  std::vector<FlowFileResult> flow_files_;
+  bool interrupted_ = false;
+  const std::shared_ptr<logging::Logger> logger_;
+
+  std::string logStatus(const std::unique_lock<std::mutex>& lock) const {
+    gsl_Expects(lock.owns_lock());
+    const auto messageresult_ok = [](const MessageResult r) { return r.status == MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+    const auto messageresult_inflight = [](const MessageResult r) { return r.status == MessageStatus::InFlight && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+    std::vector<size_t> flow_files_in_flight;
+    std::ostringstream oss;
+    if (interrupted_) { oss << "interrupted, "; }
+    for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) {
+      const auto& flow_file = flow_files_[ffi];
+      if (!flow_file.flow_file_error && std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), messageresult_ok)) {
+        continue;  // don't log the happy path to reduce log spam
+      }
+      if (!flow_file.flow_file_error && std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), messageresult_inflight)) {
+        flow_files_in_flight.push_back(ffi);
+        continue;  // don't log fully in-flight flow files here, log them at the end instead
+      }
+      oss << '[' << ffi << "]: {";
+      if (flow_file.flow_file_error) { oss << "error, "; }
+      for (size_t msgi = 0; msgi < flow_file.messages.size(); ++msgi) {
+        const auto& msg = flow_file.messages[msgi];
+        if (messageresult_ok(msg)) {
+          continue;
+        }
+        oss << '<' << msgi << ">: (msg " << to_string(msg.status) << ", " << rd_kafka_err2str(msg.err_code) << "), ";
+      }
+      oss << "}, ";
+    }
+    oss << "in-flight (" << flow_files_in_flight.size() << "): " << utils::StringUtils::join(",", flow_files_in_flight);
+    return oss.str();
+  }
+
+ public:
+  explicit Messages(std::shared_ptr<logging::Logger> logger) :logger_{std::move(logger)} {}
+
+  void waitForCompletion() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cv_.wait(lock, [this, &lock] {
+      if (logger_->should_log(logging::LOG_LEVEL::trace)) {
+        logger_->log_trace("%s", logStatus(lock));
+      }
+      return interrupted_ || std::all_of(std::begin(this->flow_files_), std::end(this->flow_files_), [](const FlowFileResult& flow_file) {
+        return flow_file.flow_file_error || std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), [](const MessageResult& message) {
+          return message.status != MessageStatus::InFlight;
+        });
+      });
+    });
+  }
+
+  template<typename Func>
+  auto modifyResult(size_t index, Func fun) -> decltype(fun(flow_files_.at(index))) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    const auto notifier = gsl::finally([this]{ cv_.notify_all(); });
+    try {
+      return fun(flow_files_.at(index));
+    } catch(const std::exception& ex) {
+      logger_->log_warn("Messages::modifyResult exception: %s", ex.what());
+      throw;
+    }
+  }
+
+  size_t addFlowFile() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    flow_files_.emplace_back();
+    return flow_files_.size() - 1;
+  }
+
+  template<typename Func>
+  auto iterateFlowFiles(Func fun) -> utils::void_t<decltype(fun(size_t{0}, flow_files_.front()))> {
+    std::lock_guard<std::mutex> lock(mutex_);
+    for (size_t index = 0U; index < flow_files_.size(); index++) {
+      fun(index, flow_files_[index]);
+    }
+  }
+
+  void interrupt() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    interrupted_ = true;
+    cv_.notify_all();
+    gsl_Ensures(interrupted_);
+  }
+
+  bool wasInterrupted() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return interrupted_;
+  }
+};
+
+namespace {
+class ReadCallback : public InputStreamCallback {
+ public:
+  struct rd_kafka_headers_deleter {
+    void operator()(rd_kafka_headers_t* ptr) const noexcept {
+      rd_kafka_headers_destroy(ptr);
+    }
+  };
+
+  using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
+
+ private:
+  void allocate_message_object(const size_t segment_num) const {
+    messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+      // allocate message object to be filled in by the callback in produce()
+      if (flow_file.messages.size() < segment_num + 1) {
+        flow_file.messages.resize(segment_num + 1);
+      }
+      gsl_Ensures(flow_file.messages.size() > segment_num);
+    });
+  }
+
+  static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) {
+    const gsl::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
+    if (!result) { throw std::bad_alloc{}; }
+
+    for (const auto& kv : flow_file.getAttributes()) {
+      if (attribute_name_regex.match(kv.first)) {
+        rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+      }
+    }
+    return rd_kafka_headers_unique_ptr{ result };
+  }
+
+  rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+    const std::shared_ptr<PublishKafka::Messages> messages_ptr_copy = this->messages_;
+    const auto flow_file_index_copy = this->flow_file_index_;
+    const auto logger = logger_;
+    const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num, logger](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+      messages_ptr_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage, logger, flow_file_index_copy](FlowFileResult &flow_file) {
+        auto &message = flow_file.messages.at(segment_num);
+        message.err_code = rkmessage->err;
+        message.status = message.err_code == 0 ? MessageStatus::Success : MessageStatus::Error;
+        if (message.err_code != RD_KAFKA_RESP_ERR_NO_ERROR) {
+          logger->log_warn("delivery callback, flow file #%zu/segment #%zu: %s", flow_file_index_copy, segment_num, rd_kafka_err2str(message.err_code));
+        } else {
+          logger->log_debug("delivery callback, flow file #%zu/segment #%zu: success", flow_file_index_copy, segment_num);
+        }
+      });
+    };
+    // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+    auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+    allocate_message_object(segment_num);
+
+    const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get());
+    const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+        RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
+    if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
+      // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback
+      // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it
+      (void)callback_ptr.release();
+    } else {
+      // in case of failure, rd_kafka_producev doesn't take ownership of the headers, so we need to delete them
+      rd_kafka_headers_destroy(hdrs_copy);
+    }
+    logger_->log_trace("produce enqueued flow file #%zu/segment #%zu: %s", flow_file_index_, segment_num, rd_kafka_err2str(err));
+    return err;
+  }
+
+ public:
+  ReadCallback(const uint64_t max_seg_size,
+      std::string key,
+      rd_kafka_topic_t* const rkt,
+      rd_kafka_t* const rk,
+      const core::FlowFile& flowFile,
+      utils::Regex& attributeNameRegex,
+      std::shared_ptr<PublishKafka::Messages> messages,
+      const size_t flow_file_index,
+      const bool fail_empty_flow_files,
+      std::shared_ptr<logging::Logger> logger)
+      : flow_size_(flowFile.getSize()),
+      max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+      key_(std::move(key)),
+      rkt_(rkt),
+      rk_(rk),
+      hdrs(make_headers(flowFile, attributeNameRegex)),
+      messages_(std::move(messages)),
+      flow_file_index_(flow_file_index),
+      fail_empty_flow_files_(fail_empty_flow_files),
+      logger_(std::move(logger))
+  { }
+
+  ReadCallback(const ReadCallback&) = delete;
+  ReadCallback& operator=(ReadCallback) = delete;
+
+  int64_t process(const std::shared_ptr<io::BaseStream> stream) override {
+    std::vector<unsigned char> buffer;
+
+    buffer.resize(max_seg_size_);
+    read_size_ = 0;
+    status_ = 0;
+    called_ = true;
+
+    gsl_Expects(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
+    // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
+    const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
+    messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
+      flow_file.messages.reserve(reserved_msg_capacity);
+    });
+
+    // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
+    if (flow_size_ == 0 && !fail_empty_flow_files_) {
+      const auto err = produce(0, buffer, 0);
+      if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+        status_ = -1;
+        error_ = rd_kafka_err2str(err);
+      }
+      return 0;
+    }
+
+    for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
+      const int readRet = stream->read(buffer.data(), buffer.size());
+      if (readRet < 0) {
+        status_ = -1;
+        error_ = "Failed to read from stream";
+        return read_size_;
+      }
+
+      if (readRet <= 0) { break; }
+
+      const auto err = produce(segment_num, buffer, readRet);
+      if (err) {
+        messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) {
+          auto& message = flow_file.messages.at(segment_num);
+          message.status = MessageStatus::Error;
+          message.err_code = err;
+        });
+        status_ = -1;
+        error_ = rd_kafka_err2str(err);
+        return read_size_;
+      }
+      read_size_ += readRet;
+    }
+    return read_size_;
+  }
+
+  const uint64_t flow_size_ = 0;
+  const uint64_t max_seg_size_ = 0;
+  const std::string key_;
+  rd_kafka_topic_t* const rkt_ = nullptr;
+  rd_kafka_t* const rk_ = nullptr;
+  const rd_kafka_headers_unique_ptr hdrs;  // not null
+  const std::shared_ptr<PublishKafka::Messages> messages_;
+  const size_t flow_file_index_;
+  int status_ = 0;
+  std::string error_;
+  int read_size_ = 0;
+  bool called_ = false;
+  const bool fail_empty_flow_files_ = true;
+  const std::shared_ptr<logging::Logger> logger_;
+};
+
+/**
+ * Message delivery report callback using the richer rd_kafka_message_t object.
+ */
+void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) {
+  if (rkmessage->_private == nullptr) {
+    return;
+  }
+  // allocated in ReadCallback::produce
+  auto* const func = reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private);
+  try {
+    (*func)(rk, rkmessage);
+  } catch (...) { }
+  delete func;
+}
 }  // namespace
 
 void PublishKafka::initialize() {
@@ -235,20 +549,6 @@
   conn_.reset();
 }
 
-/**
- * Message delivery report callback using the richer rd_kafka_message_t object.
- */
-void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) {
-  if (rkmessage->_private == nullptr) {
-    return;
-  }
-  // allocated in PublishKafka::ReadCallback::produce
-  auto* func = reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private);
-  try {
-    (*func)(rk, rkmessage);
-  } catch (...) { }
-  delete func;
-}
 
 bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessContext> &context) {
   std::string value;
@@ -256,14 +556,14 @@
   std::string valueConf;
   std::array<char, 512U> errstr{};
   rd_kafka_conf_res_t result;
-  const std::string PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
+  const char* const PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
 
   std::unique_ptr<rd_kafka_conf_t, rd_kafka_conf_deleter> conf_{ rd_kafka_conf_new() };
   if (conf_ == nullptr) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object");
   }
 
-  auto key = conn_->getKey();
+  const auto* const key = conn_->getKey();
 
   if (key->brokers_.empty()) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
@@ -451,7 +751,7 @@
   }
 
   // Set the delivery callback
-  rd_kafka_conf_set_dr_msg_cb(conf_.get(), &PublishKafka::messageDeliveryCallback);
+  rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback);
 
   // Set the logger callback
   rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback);
@@ -579,13 +879,13 @@
   }
   logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes);
 
-  auto messages = std::make_shared<Messages>();
+  auto messages = std::make_shared<Messages>(logger_);
   // We must add this to the messages set, so that it will be interrupted when notifyStop is called
   {
     std::lock_guard<std::mutex> lock(messages_mutex_);
     messages_set_.emplace(messages);
   }
-  // We also have to insure that it will be removed once we are done with it
+  // We also have to ensure that it will be removed once we are done with it
   const auto messagesSetGuard = gsl::finally([&]() {
     std::lock_guard<std::mutex> lock(messages_mutex_);
     messages_set_.erase(messages);
@@ -636,8 +936,8 @@
     bool failEmptyFlowFiles = true;
     context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles);
 
-    PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
-                                        attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles);
+    ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
+                                        attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_);
     session->read(flowFile, &callback);
 
     if (!callback.called_) {
@@ -677,20 +977,20 @@
       for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
         const auto& message = flow_file.messages[segment_num];
         switch (message.status) {
-          case MessageStatus::MESSAGESTATUS_UNCOMPLETE:
+          case MessageStatus::InFlight:
             success = false;
             logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu",
                 flowFiles[index]->getUUIDStr(),
                 segment_num);
           break;
-          case MessageStatus::MESSAGESTATUS_ERROR:
+          case MessageStatus::Error:
             success = false;
             logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s",
                 flowFiles[index]->getUUIDStr(),
                 segment_num,
                 rd_kafka_err2str(message.err_code));
           break;
-          case MessageStatus::MESSAGESTATUS_SUCCESS:
+          case MessageStatus::Success:
             logger_->log_debug("Successfully delivered flow file %s segment %zu",
                 flowFiles[index]->getUUIDStr(),
                 segment_num);
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 5d16717..7cc9101 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -50,36 +50,9 @@
 namespace minifi {
 namespace processors {
 
-#define COMPRESSION_CODEC_NONE "none"
-#define COMPRESSION_CODEC_GZIP "gzip"
-#define COMPRESSION_CODEC_SNAPPY "snappy"
-#define ROUND_ROBIN_PARTITIONING "Round Robin"
-#define RANDOM_PARTITIONING "Random Robin"
-#define USER_DEFINED_PARTITIONING "User-Defined"
-#define DELIVERY_REPLICATED "all"
-#define DELIVERY_ONE_NODE "1"
-#define DELIVERY_BEST_EFFORT "0"
-#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define SECURITY_PROTOCOL_SSL "ssl"
-#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext"
-#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
-#define KAFKA_KEY_ATTRIBUTE "kafka.key"
-
 // PublishKafka Class
 class PublishKafka : public core::Processor {
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
-  explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
-      : core::Processor(std::move(name), uuid),
-        logger_(logging::LoggerFactory<PublishKafka>::getLogger()),
-        interrupted_(false) {
-  }
-
-  virtual ~PublishKafka() = default;
-
   static constexpr char const* ProcessorName = "PublishKafka";
 
   // Supported Properties
@@ -114,229 +87,14 @@
   static const core::Relationship Failure;
   static const core::Relationship Success;
 
-  // Message
-  enum class MessageStatus : uint8_t {
-    MESSAGESTATUS_UNCOMPLETE,
-    MESSAGESTATUS_ERROR,
-    MESSAGESTATUS_SUCCESS
-  };
-
-  struct MessageResult {
-    MessageStatus status = MessageStatus::MESSAGESTATUS_UNCOMPLETE;
-    rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_UNKNOWN;
-  };
-
-  struct FlowFileResult {
-    bool flow_file_error = false;
-    std::vector<MessageResult> messages;
-  };
-
-  struct Messages {
-    std::mutex mutex;
-    std::condition_variable cv;
-    std::vector<FlowFileResult> flow_files;
-    bool interrupted = false;
-
-    void waitForCompletion() {
-      std::unique_lock<std::mutex> lock(mutex);
-      cv.wait(lock, [this]() -> bool {
-        if (interrupted) {
-          return true;
-        }
-        size_t index = 0U;
-        return std::all_of(this->flow_files.begin(), this->flow_files.end(), [&](const FlowFileResult& flow_file) {
-          index++;
-          if (flow_file.flow_file_error) {
-            return true;
-          }
-          return std::all_of(flow_file.messages.begin(), flow_file.messages.end(), [](const MessageResult& message) {
-            return message.status != MessageStatus::MESSAGESTATUS_UNCOMPLETE;
-          });
-        });
-      });
-    }
-
-    void modifyResult(size_t index, const std::function<void(FlowFileResult&)>& fun) {
-      std::unique_lock<std::mutex> lock(mutex);
-      fun(flow_files.at(index));
-      cv.notify_all();
-    }
-
-    size_t addFlowFile() {
-      std::lock_guard<std::mutex> lock(mutex);
-      flow_files.emplace_back();
-      return flow_files.size() - 1;
-    }
-
-    void iterateFlowFiles(const std::function<void(size_t /*index*/, const FlowFileResult& /*flow_file_result*/)>& fun) {
-      std::lock_guard<std::mutex> lock(mutex);
-      for (size_t index = 0U; index < flow_files.size(); index++) {
-        fun(index, flow_files[index]);
-      }
-    }
-
-    void interrupt() {
-      std::unique_lock<std::mutex> lock(mutex);
-      interrupted = true;
-      cv.notify_all();
-    }
-
-    bool wasInterrupted() {
-      std::lock_guard<std::mutex> lock(mutex);
-      return interrupted;
-    }
-  };
-
-  // Nest Callback Class for read stream
-  class ReadCallback : public InputStreamCallback {
-   public:
-    struct rd_kafka_headers_deleter {
-      void operator()(rd_kafka_headers_t* ptr) const noexcept {
-        rd_kafka_headers_destroy(ptr);
-      }
-    };
-
-    using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
-
-   private:
-    void allocate_message_object(const size_t segment_num) const {
-      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
-        // allocate message object to be filled in by the callback in produce()
-        if (flow_file.messages.size() < segment_num + 1) {
-          flow_file.messages.resize(segment_num + 1);
-        }
-      });
-    }
-
-    static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) {
-      const gsl::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
-      if (!result) { throw std::bad_alloc{}; }
-
-      for (const auto& kv : flow_file.getAttributes()) {
-        if (attribute_name_regex.match(kv.first)) {
-          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
-        }
-      }
-      return rd_kafka_headers_unique_ptr{ result };
-    }
-
-    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
-      const std::shared_ptr<Messages> messages_ptr_copy = this->messages_;
-      const auto flow_file_index_copy = this->flow_file_index_;
-      const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
-        messages_ptr_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
-          auto &message = flow_file.messages.at(segment_num);
-          message.err_code = rkmessage->err;
-          message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
-        });
-      };
-      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
-      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
-
-      allocate_message_object(segment_num);
-
-      const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get());
-      const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
-                                         RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
-      if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
-        // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback
-        // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it
-        (void)callback_ptr.release();
-      } else {
-        // in case of failure, rd_kafka_producev doesn't take ownership of the headers, so we need to delete them
-        rd_kafka_headers_destroy(hdrs_copy);
-      }
-      return err;
-    }
-
-   public:
-    ReadCallback(const uint64_t max_seg_size,
-                 std::string key,
-                 rd_kafka_topic_t * const rkt,
-                 rd_kafka_t * const rk,
-                 const core::FlowFile& flowFile,
-                 utils::Regex &attributeNameRegex,
-                 std::shared_ptr<Messages> messages,
-                 const size_t flow_file_index,
-                 const bool fail_empty_flow_files)
-        : flow_size_(flowFile.getSize()),
-          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
-          key_(std::move(key)),
-          rkt_(rkt),
-          rk_(rk),
-          hdrs(make_headers(flowFile, attributeNameRegex)),
-          messages_(std::move(messages)),
-          flow_file_index_(flow_file_index),
-          fail_empty_flow_files_(fail_empty_flow_files)
-    { }
-
-    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
-      std::vector<unsigned char> buffer;
-
-      buffer.resize(max_seg_size_);
-      read_size_ = 0;
-      status_ = 0;
-      called_ = true;
-
-      assert(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
-      // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
-      const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
-      messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
-        flow_file.messages.reserve(reserved_msg_capacity);
-      });
-
-      // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
-      if (flow_size_ == 0 && !fail_empty_flow_files_) {
-        produce(0, buffer, 0);
-        return 0;
-      }
-
-      for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
-        const int readRet = stream->read(buffer.data(), buffer.size());
-        if (readRet < 0) {
-          status_ = -1;
-          error_ = "Failed to read from stream";
-          return read_size_;
-        }
-
-        if (readRet <= 0) { break; }
-
-        const auto err = produce(segment_num, buffer, readRet);
-        if (err) {
-          messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) {
-            auto& message = flow_file.messages.at(segment_num);
-            message.status = MessageStatus::MESSAGESTATUS_ERROR;
-            message.err_code = err;
-          });
-          status_ = -1;
-          error_ = rd_kafka_err2str(err);
-          return read_size_;
-        }
-        read_size_ += readRet;
-      }
-      return read_size_;
-    }
-
-    const uint64_t flow_size_ = 0;
-    const uint64_t max_seg_size_ = 0;
-    const std::string key_;
-    rd_kafka_topic_t * const rkt_ = nullptr;
-    rd_kafka_t * const rk_ = nullptr;
-    const rd_kafka_headers_unique_ptr hdrs;  // not null
-    const std::shared_ptr<Messages> messages_;
-    const size_t flow_file_index_;
-    int status_ = 0;
-    std::string error_;
-    int read_size_ = 0;
-    bool called_ = false;
-    const bool fail_empty_flow_files_ = true;
-  };
-
- public:
-  bool supportsDynamicProperties() override {
-    return true;
+  explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
+      : core::Processor(std::move(name), uuid) {
   }
 
+  ~PublishKafka() override = default;
+
+  bool supportsDynamicProperties() override { return true; }
+
   /**
    * Function that's executed when the processor is scheduled.
    * @param context process context.
@@ -348,25 +106,25 @@
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   void notifyStop() override;
 
+  class Messages;
+
  protected:
   bool configureNewConnection(const std::shared_ptr<core::ProcessContext> &context);
   bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name);
 
  private:
-  static void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque);
-
-  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<PublishKafka>::getLogger()};
 
   KafkaConnectionKey key_;
   std::unique_ptr<KafkaConnection> conn_;
   std::mutex connection_mutex_;
 
-  uint32_t batch_size_;
-  uint64_t target_batch_payload_size_;
-  uint64_t max_flow_seg_size_;
+  uint32_t batch_size_{};
+  uint64_t target_batch_payload_size_{};
+  uint64_t max_flow_seg_size_{};
   utils::Regex attributeNameRegex_;
 
-  std::atomic<bool> interrupted_;
+  std::atomic<bool> interrupted_{false};
   std::mutex messages_mutex_;
   std::set<std::shared_ptr<Messages>> messages_set_;
 };
diff --git a/win_build_vs.bat b/win_build_vs.bat
index a2beefa..3a24778 100644
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -64,6 +64,9 @@
     if [%%~x] EQU [/D] (
         set cmake_build_type=RelWithDebInfo
     )
+    if [%%~x] EQU [/DD] (
+        set cmake_build_type=Debug
+    )
     if [%%~x] EQU [/CI] (
         set "strict_gsl_checks=-DSTRICT_GSL_CHECKS=AUDIT"
     )