MINIFICPP-1326 improve PublishKafka logging
..., move implementation details to the .cpp file, and
other small refactors and improvements
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #868
diff --git a/Windows.md b/Windows.md
index d453955..c2400be 100644
--- a/Windows.md
+++ b/Windows.md
@@ -62,6 +62,7 @@
| /J | Enables JNI |
| /64 | Creates 64-bit build instead of a 32-bit one |
| /D | Builds RelWithDebInfo build instead of Release |
+| /DD | Builds Debug build instead of Release |
| /CI | Sets STRICT_GSL_CHECKS to AUDIT |
Examples:
diff --git a/extensions/librdkafka/KafkaConnection.cpp b/extensions/librdkafka/KafkaConnection.cpp
index cf871a8..3d412ad 100644
--- a/extensions/librdkafka/KafkaConnection.cpp
+++ b/extensions/librdkafka/KafkaConnection.cpp
@@ -87,7 +87,7 @@
return nullptr;
}
-KafkaConnectionKey const * const KafkaConnection::getKey() const {
+KafkaConnectionKey const* KafkaConnection::getKey() const {
return &key_;
}
diff --git a/extensions/librdkafka/KafkaConnection.h b/extensions/librdkafka/KafkaConnection.h
index 816b40a..8d5b12e 100644
--- a/extensions/librdkafka/KafkaConnection.h
+++ b/extensions/librdkafka/KafkaConnection.h
@@ -37,14 +37,16 @@
namespace minifi {
namespace processors {
-class KafkaConnectionKey {
- public:
- std::string client_id_;
- std::string brokers_;
+struct KafkaConnectionKey {
+ std::string client_id_;
+ std::string brokers_;
- bool operator <(const KafkaConnectionKey& rhs) const {
- return std::tie(brokers_, client_id_) < std::tie(rhs.brokers_, rhs.client_id_);
- }
+ bool operator< (const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) < std::tie(rhs.brokers_, rhs.client_id_); }
+ bool operator<=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) <= std::tie(rhs.brokers_, rhs.client_id_); }
+ bool operator==(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) == std::tie(rhs.brokers_, rhs.client_id_); }
+ bool operator!=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) != std::tie(rhs.brokers_, rhs.client_id_); }
+ bool operator> (const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) > std::tie(rhs.brokers_, rhs.client_id_); }
+ bool operator>=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) >= std::tie(rhs.brokers_, rhs.client_id_); }
};
class KafkaConnection {
@@ -70,7 +72,7 @@
std::shared_ptr<KafkaTopic> getTopic(const std::string &topic) const;
- KafkaConnectionKey const * const getKey() const;
+ KafkaConnectionKey const* getKey() const;
void putTopic(const std::string &topicName, const std::shared_ptr<KafkaTopic> &topic);
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index fc5ee56..1b92edc 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -39,6 +39,21 @@
namespace minifi {
namespace processors {
+#define COMPRESSION_CODEC_NONE "none"
+#define COMPRESSION_CODEC_GZIP "gzip"
+#define COMPRESSION_CODEC_SNAPPY "snappy"
+#define ROUND_ROBIN_PARTITIONING "Round Robin"
+#define RANDOM_PARTITIONING "Random Robin"
+#define USER_DEFINED_PARTITIONING "User-Defined"
+#define DELIVERY_REPLICATED "all"
+#define DELIVERY_ONE_NODE "1"
+#define DELIVERY_BEST_EFFORT "0"
+#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
+#define SECURITY_PROTOCOL_SSL "ssl"
+#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext"
+#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
+#define KAFKA_KEY_ATTRIBUTE "kafka.key"
+
const core::Property PublishKafka::SeedBrokers(
core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
->isRequired(true)->supportsExpressionLanguage(true)->build());
@@ -144,6 +159,305 @@
struct rd_kafka_topic_conf_deleter {
void operator()(rd_kafka_topic_conf_t* p) const noexcept { rd_kafka_topic_conf_destroy(p); }
};
+
+// Message
+enum class MessageStatus : uint8_t {
+ InFlight,
+ Error,
+ Success
+};
+
+const char* to_string(const MessageStatus s) {
+ switch (s) {
+ case MessageStatus::InFlight: return "InFlight";
+ case MessageStatus::Error: return "Error";
+ case MessageStatus::Success: return "Success";
+ }
+ throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable code"};
+}
+
+struct MessageResult {
+ MessageStatus status = MessageStatus::InFlight;
+ rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR;
+};
+
+struct FlowFileResult {
+ bool flow_file_error = false;
+ std::vector<MessageResult> messages;
+};
+} // namespace
+
+class PublishKafka::Messages {
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::vector<FlowFileResult> flow_files_;
+ bool interrupted_ = false;
+ const std::shared_ptr<logging::Logger> logger_;
+
+ std::string logStatus(const std::unique_lock<std::mutex>& lock) const {
+ gsl_Expects(lock.owns_lock());
+ const auto messageresult_ok = [](const MessageResult r) { return r.status == MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+ const auto messageresult_inflight = [](const MessageResult r) { return r.status == MessageStatus::InFlight && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+ std::vector<size_t> flow_files_in_flight;
+ std::ostringstream oss;
+ if (interrupted_) { oss << "interrupted, "; }
+ for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) {
+ const auto& flow_file = flow_files_[ffi];
+ if (!flow_file.flow_file_error && std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), messageresult_ok)) {
+ continue; // don't log the happy path to reduce log spam
+ }
+ if (!flow_file.flow_file_error && std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), messageresult_inflight)) {
+ flow_files_in_flight.push_back(ffi);
+ continue; // don't log fully in-flight flow files here, log them at the end instead
+ }
+ oss << '[' << ffi << "]: {";
+ if (flow_file.flow_file_error) { oss << "error, "; }
+ for (size_t msgi = 0; msgi < flow_file.messages.size(); ++msgi) {
+ const auto& msg = flow_file.messages[msgi];
+ if (messageresult_ok(msg)) {
+ continue;
+ }
+ oss << '<' << msgi << ">: (msg " << to_string(msg.status) << ", " << rd_kafka_err2str(msg.err_code) << "), ";
+ }
+ oss << "}, ";
+ }
+ oss << "in-flight (" << flow_files_in_flight.size() << "): " << utils::StringUtils::join(",", flow_files_in_flight);
+ return oss.str();
+ }
+
+ public:
+ explicit Messages(std::shared_ptr<logging::Logger> logger) :logger_{std::move(logger)} {}
+
+ void waitForCompletion() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ cv_.wait(lock, [this, &lock] {
+ if (logger_->should_log(logging::LOG_LEVEL::trace)) {
+ logger_->log_trace("%s", logStatus(lock));
+ }
+ return interrupted_ || std::all_of(std::begin(this->flow_files_), std::end(this->flow_files_), [](const FlowFileResult& flow_file) {
+ return flow_file.flow_file_error || std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), [](const MessageResult& message) {
+ return message.status != MessageStatus::InFlight;
+ });
+ });
+ });
+ }
+
+ template<typename Func>
+ auto modifyResult(size_t index, Func fun) -> decltype(fun(flow_files_.at(index))) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ const auto notifier = gsl::finally([this]{ cv_.notify_all(); });
+ try {
+ return fun(flow_files_.at(index));
+ } catch(const std::exception& ex) {
+ logger_->log_warn("Messages::modifyResult exception: %s", ex.what());
+ throw;
+ }
+ }
+
+ size_t addFlowFile() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ flow_files_.emplace_back();
+ return flow_files_.size() - 1;
+ }
+
+ template<typename Func>
+ auto iterateFlowFiles(Func fun) -> utils::void_t<decltype(fun(size_t{0}, flow_files_.front()))> {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (size_t index = 0U; index < flow_files_.size(); index++) {
+ fun(index, flow_files_[index]);
+ }
+ }
+
+ void interrupt() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ interrupted_ = true;
+ cv_.notify_all();
+ gsl_Ensures(interrupted_);
+ }
+
+ bool wasInterrupted() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return interrupted_;
+ }
+};
+
+namespace {
+class ReadCallback : public InputStreamCallback {
+ public:
+ struct rd_kafka_headers_deleter {
+ void operator()(rd_kafka_headers_t* ptr) const noexcept {
+ rd_kafka_headers_destroy(ptr);
+ }
+ };
+
+ using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
+
+ private:
+ void allocate_message_object(const size_t segment_num) const {
+ messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
+ // allocate message object to be filled in by the callback in produce()
+ if (flow_file.messages.size() < segment_num + 1) {
+ flow_file.messages.resize(segment_num + 1);
+ }
+ gsl_Ensures(flow_file.messages.size() > segment_num);
+ });
+ }
+
+ static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) {
+ const gsl::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
+ if (!result) { throw std::bad_alloc{}; }
+
+ for (const auto& kv : flow_file.getAttributes()) {
+ if (attribute_name_regex.match(kv.first)) {
+ rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
+ }
+ }
+ return rd_kafka_headers_unique_ptr{ result };
+ }
+
+ rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
+ const std::shared_ptr<PublishKafka::Messages> messages_ptr_copy = this->messages_;
+ const auto flow_file_index_copy = this->flow_file_index_;
+ const auto logger = logger_;
+ const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num, logger](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+ messages_ptr_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage, logger, flow_file_index_copy](FlowFileResult &flow_file) {
+ auto &message = flow_file.messages.at(segment_num);
+ message.err_code = rkmessage->err;
+ message.status = message.err_code == 0 ? MessageStatus::Success : MessageStatus::Error;
+ if (message.err_code != RD_KAFKA_RESP_ERR_NO_ERROR) {
+ logger->log_warn("delivery callback, flow file #%zu/segment #%zu: %s", flow_file_index_copy, segment_num, rd_kafka_err2str(message.err_code));
+ } else {
+ logger->log_debug("delivery callback, flow file #%zu/segment #%zu: success", flow_file_index_copy, segment_num);
+ }
+ });
+ };
+ // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+ auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+ allocate_message_object(segment_num);
+
+ const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get());
+ const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
+ RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
+ if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
+ // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback
+ // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it
+ (void)callback_ptr.release();
+ } else {
+ // in case of failure, rd_kafka_producev doesn't take ownership of the headers, so we need to delete them
+ rd_kafka_headers_destroy(hdrs_copy);
+ }
+ logger_->log_trace("produce enqueued flow file #%zu/segment #%zu: %s", flow_file_index_, segment_num, rd_kafka_err2str(err));
+ return err;
+ }
+
+ public:
+ ReadCallback(const uint64_t max_seg_size,
+ std::string key,
+ rd_kafka_topic_t* const rkt,
+ rd_kafka_t* const rk,
+ const core::FlowFile& flowFile,
+ utils::Regex& attributeNameRegex,
+ std::shared_ptr<PublishKafka::Messages> messages,
+ const size_t flow_file_index,
+ const bool fail_empty_flow_files,
+ std::shared_ptr<logging::Logger> logger)
+ : flow_size_(flowFile.getSize()),
+ max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
+ key_(std::move(key)),
+ rkt_(rkt),
+ rk_(rk),
+ hdrs(make_headers(flowFile, attributeNameRegex)),
+ messages_(std::move(messages)),
+ flow_file_index_(flow_file_index),
+ fail_empty_flow_files_(fail_empty_flow_files),
+ logger_(std::move(logger))
+ { }
+
+ ReadCallback(const ReadCallback&) = delete;
+ ReadCallback& operator=(ReadCallback) = delete;
+
+ int64_t process(const std::shared_ptr<io::BaseStream> stream) override {
+ std::vector<unsigned char> buffer;
+
+ buffer.resize(max_seg_size_);
+ read_size_ = 0;
+ status_ = 0;
+ called_ = true;
+
+ gsl_Expects(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
+ // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
+ const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
+ messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
+ flow_file.messages.reserve(reserved_msg_capacity);
+ });
+
+ // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
+ if (flow_size_ == 0 && !fail_empty_flow_files_) {
+ const auto err = produce(0, buffer, 0);
+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+ status_ = -1;
+ error_ = rd_kafka_err2str(err);
+ }
+ return 0;
+ }
+
+ for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
+ const int readRet = stream->read(buffer.data(), buffer.size());
+ if (readRet < 0) {
+ status_ = -1;
+ error_ = "Failed to read from stream";
+ return read_size_;
+ }
+
+ if (readRet <= 0) { break; }
+
+ const auto err = produce(segment_num, buffer, readRet);
+ if (err) {
+ messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) {
+ auto& message = flow_file.messages.at(segment_num);
+ message.status = MessageStatus::Error;
+ message.err_code = err;
+ });
+ status_ = -1;
+ error_ = rd_kafka_err2str(err);
+ return read_size_;
+ }
+ read_size_ += readRet;
+ }
+ return read_size_;
+ }
+
+ const uint64_t flow_size_ = 0;
+ const uint64_t max_seg_size_ = 0;
+ const std::string key_;
+ rd_kafka_topic_t* const rkt_ = nullptr;
+ rd_kafka_t* const rk_ = nullptr;
+ const rd_kafka_headers_unique_ptr hdrs; // not null
+ const std::shared_ptr<PublishKafka::Messages> messages_;
+ const size_t flow_file_index_;
+ int status_ = 0;
+ std::string error_;
+ int read_size_ = 0;
+ bool called_ = false;
+ const bool fail_empty_flow_files_ = true;
+ const std::shared_ptr<logging::Logger> logger_;
+};
+
+/**
+ * Message delivery report callback using the richer rd_kafka_message_t object.
+ */
+void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) {
+ if (rkmessage->_private == nullptr) {
+ return;
+ }
+ // allocated in ReadCallback::produce
+ auto* const func = reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private);
+ try {
+ (*func)(rk, rkmessage);
+ } catch (...) { }
+ delete func;
+}
} // namespace
void PublishKafka::initialize() {
@@ -235,20 +549,6 @@
conn_.reset();
}
-/**
- * Message delivery report callback using the richer rd_kafka_message_t object.
- */
-void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) {
- if (rkmessage->_private == nullptr) {
- return;
- }
- // allocated in PublishKafka::ReadCallback::produce
- auto* func = reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private);
- try {
- (*func)(rk, rkmessage);
- } catch (...) { }
- delete func;
-}
bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessContext> &context) {
std::string value;
@@ -256,14 +556,14 @@
std::string valueConf;
std::array<char, 512U> errstr{};
rd_kafka_conf_res_t result;
- const std::string PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
+ const char* const PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
std::unique_ptr<rd_kafka_conf_t, rd_kafka_conf_deleter> conf_{ rd_kafka_conf_new() };
if (conf_ == nullptr) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object");
}
- auto key = conn_->getKey();
+ const auto* const key = conn_->getKey();
if (key->brokers_.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
@@ -451,7 +751,7 @@
}
// Set the delivery callback
- rd_kafka_conf_set_dr_msg_cb(conf_.get(), &PublishKafka::messageDeliveryCallback);
+ rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback);
// Set the logger callback
rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback);
@@ -579,13 +879,13 @@
}
logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes);
- auto messages = std::make_shared<Messages>();
+ auto messages = std::make_shared<Messages>(logger_);
// We must add this to the messages set, so that it will be interrupted when notifyStop is called
{
std::lock_guard<std::mutex> lock(messages_mutex_);
messages_set_.emplace(messages);
}
- // We also have to insure that it will be removed once we are done with it
+ // We also have to ensure that it will be removed once we are done with it
const auto messagesSetGuard = gsl::finally([&]() {
std::lock_guard<std::mutex> lock(messages_mutex_);
messages_set_.erase(messages);
@@ -636,8 +936,8 @@
bool failEmptyFlowFiles = true;
context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles);
- PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
- attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles);
+ ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
+ attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_);
session->read(flowFile, &callback);
if (!callback.called_) {
@@ -677,20 +977,20 @@
for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
const auto& message = flow_file.messages[segment_num];
switch (message.status) {
- case MessageStatus::MESSAGESTATUS_UNCOMPLETE:
+ case MessageStatus::InFlight:
success = false;
logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu",
flowFiles[index]->getUUIDStr(),
segment_num);
break;
- case MessageStatus::MESSAGESTATUS_ERROR:
+ case MessageStatus::Error:
success = false;
logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s",
flowFiles[index]->getUUIDStr(),
segment_num,
rd_kafka_err2str(message.err_code));
break;
- case MessageStatus::MESSAGESTATUS_SUCCESS:
+ case MessageStatus::Success:
logger_->log_debug("Successfully delivered flow file %s segment %zu",
flowFiles[index]->getUUIDStr(),
segment_num);
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 5d16717..7cc9101 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -50,36 +50,9 @@
namespace minifi {
namespace processors {
-#define COMPRESSION_CODEC_NONE "none"
-#define COMPRESSION_CODEC_GZIP "gzip"
-#define COMPRESSION_CODEC_SNAPPY "snappy"
-#define ROUND_ROBIN_PARTITIONING "Round Robin"
-#define RANDOM_PARTITIONING "Random Robin"
-#define USER_DEFINED_PARTITIONING "User-Defined"
-#define DELIVERY_REPLICATED "all"
-#define DELIVERY_ONE_NODE "1"
-#define DELIVERY_BEST_EFFORT "0"
-#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define SECURITY_PROTOCOL_SSL "ssl"
-#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext"
-#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl"
-#define KAFKA_KEY_ATTRIBUTE "kafka.key"
-
// PublishKafka Class
class PublishKafka : public core::Processor {
public:
- // Constructor
- /*!
- * Create a new processor
- */
- explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
- : core::Processor(std::move(name), uuid),
- logger_(logging::LoggerFactory<PublishKafka>::getLogger()),
- interrupted_(false) {
- }
-
- virtual ~PublishKafka() = default;
-
static constexpr char const* ProcessorName = "PublishKafka";
// Supported Properties
@@ -114,229 +87,14 @@
static const core::Relationship Failure;
static const core::Relationship Success;
- // Message
- enum class MessageStatus : uint8_t {
- MESSAGESTATUS_UNCOMPLETE,
- MESSAGESTATUS_ERROR,
- MESSAGESTATUS_SUCCESS
- };
-
- struct MessageResult {
- MessageStatus status = MessageStatus::MESSAGESTATUS_UNCOMPLETE;
- rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_UNKNOWN;
- };
-
- struct FlowFileResult {
- bool flow_file_error = false;
- std::vector<MessageResult> messages;
- };
-
- struct Messages {
- std::mutex mutex;
- std::condition_variable cv;
- std::vector<FlowFileResult> flow_files;
- bool interrupted = false;
-
- void waitForCompletion() {
- std::unique_lock<std::mutex> lock(mutex);
- cv.wait(lock, [this]() -> bool {
- if (interrupted) {
- return true;
- }
- size_t index = 0U;
- return std::all_of(this->flow_files.begin(), this->flow_files.end(), [&](const FlowFileResult& flow_file) {
- index++;
- if (flow_file.flow_file_error) {
- return true;
- }
- return std::all_of(flow_file.messages.begin(), flow_file.messages.end(), [](const MessageResult& message) {
- return message.status != MessageStatus::MESSAGESTATUS_UNCOMPLETE;
- });
- });
- });
- }
-
- void modifyResult(size_t index, const std::function<void(FlowFileResult&)>& fun) {
- std::unique_lock<std::mutex> lock(mutex);
- fun(flow_files.at(index));
- cv.notify_all();
- }
-
- size_t addFlowFile() {
- std::lock_guard<std::mutex> lock(mutex);
- flow_files.emplace_back();
- return flow_files.size() - 1;
- }
-
- void iterateFlowFiles(const std::function<void(size_t /*index*/, const FlowFileResult& /*flow_file_result*/)>& fun) {
- std::lock_guard<std::mutex> lock(mutex);
- for (size_t index = 0U; index < flow_files.size(); index++) {
- fun(index, flow_files[index]);
- }
- }
-
- void interrupt() {
- std::unique_lock<std::mutex> lock(mutex);
- interrupted = true;
- cv.notify_all();
- }
-
- bool wasInterrupted() {
- std::lock_guard<std::mutex> lock(mutex);
- return interrupted;
- }
- };
-
- // Nest Callback Class for read stream
- class ReadCallback : public InputStreamCallback {
- public:
- struct rd_kafka_headers_deleter {
- void operator()(rd_kafka_headers_t* ptr) const noexcept {
- rd_kafka_headers_destroy(ptr);
- }
- };
-
- using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>;
-
- private:
- void allocate_message_object(const size_t segment_num) const {
- messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) {
- // allocate message object to be filled in by the callback in produce()
- if (flow_file.messages.size() < segment_num + 1) {
- flow_file.messages.resize(segment_num + 1);
- }
- });
- }
-
- static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) {
- const gsl::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) };
- if (!result) { throw std::bad_alloc{}; }
-
- for (const auto& kv : flow_file.getAttributes()) {
- if (attribute_name_regex.match(kv.first)) {
- rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size());
- }
- }
- return rd_kafka_headers_unique_ptr{ result };
- }
-
- rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const {
- const std::shared_ptr<Messages> messages_ptr_copy = this->messages_;
- const auto flow_file_index_copy = this->flow_file_index_;
- const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
- messages_ptr_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) {
- auto &message = flow_file.messages.at(segment_num);
- message.err_code = rkmessage->err;
- message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
- });
- };
- // release()d below, deallocated in PublishKafka::messageDeliveryCallback
- auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback));
-
- allocate_message_object(segment_num);
-
- const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get());
- const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen),
- RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END);
- if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
- // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback
- // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it
- (void)callback_ptr.release();
- } else {
- // in case of failure, rd_kafka_producev doesn't take ownership of the headers, so we need to delete them
- rd_kafka_headers_destroy(hdrs_copy);
- }
- return err;
- }
-
- public:
- ReadCallback(const uint64_t max_seg_size,
- std::string key,
- rd_kafka_topic_t * const rkt,
- rd_kafka_t * const rk,
- const core::FlowFile& flowFile,
- utils::Regex &attributeNameRegex,
- std::shared_ptr<Messages> messages,
- const size_t flow_file_index,
- const bool fail_empty_flow_files)
- : flow_size_(flowFile.getSize()),
- max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size),
- key_(std::move(key)),
- rkt_(rkt),
- rk_(rk),
- hdrs(make_headers(flowFile, attributeNameRegex)),
- messages_(std::move(messages)),
- flow_file_index_(flow_file_index),
- fail_empty_flow_files_(fail_empty_flow_files)
- { }
-
- int64_t process(const std::shared_ptr<io::BaseStream> stream) {
- std::vector<unsigned char> buffer;
-
- buffer.resize(max_seg_size_);
- read_size_ = 0;
- status_ = 0;
- called_ = true;
-
- assert(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0");
- // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases
- const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_);
- messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) {
- flow_file.messages.reserve(reserved_msg_capacity);
- });
-
- // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_
- if (flow_size_ == 0 && !fail_empty_flow_files_) {
- produce(0, buffer, 0);
- return 0;
- }
-
- for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
- const int readRet = stream->read(buffer.data(), buffer.size());
- if (readRet < 0) {
- status_ = -1;
- error_ = "Failed to read from stream";
- return read_size_;
- }
-
- if (readRet <= 0) { break; }
-
- const auto err = produce(segment_num, buffer, readRet);
- if (err) {
- messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) {
- auto& message = flow_file.messages.at(segment_num);
- message.status = MessageStatus::MESSAGESTATUS_ERROR;
- message.err_code = err;
- });
- status_ = -1;
- error_ = rd_kafka_err2str(err);
- return read_size_;
- }
- read_size_ += readRet;
- }
- return read_size_;
- }
-
- const uint64_t flow_size_ = 0;
- const uint64_t max_seg_size_ = 0;
- const std::string key_;
- rd_kafka_topic_t * const rkt_ = nullptr;
- rd_kafka_t * const rk_ = nullptr;
- const rd_kafka_headers_unique_ptr hdrs; // not null
- const std::shared_ptr<Messages> messages_;
- const size_t flow_file_index_;
- int status_ = 0;
- std::string error_;
- int read_size_ = 0;
- bool called_ = false;
- const bool fail_empty_flow_files_ = true;
- };
-
- public:
- bool supportsDynamicProperties() override {
- return true;
+ explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier())
+ : core::Processor(std::move(name), uuid) {
}
+ ~PublishKafka() override = default;
+
+ bool supportsDynamicProperties() override { return true; }
+
/**
* Function that's executed when the processor is scheduled.
* @param context process context.
@@ -348,25 +106,25 @@
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
void notifyStop() override;
+ class Messages;
+
protected:
bool configureNewConnection(const std::shared_ptr<core::ProcessContext> &context);
bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name);
private:
- static void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque);
-
- std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<PublishKafka>::getLogger()};
KafkaConnectionKey key_;
std::unique_ptr<KafkaConnection> conn_;
std::mutex connection_mutex_;
- uint32_t batch_size_;
- uint64_t target_batch_payload_size_;
- uint64_t max_flow_seg_size_;
+ uint32_t batch_size_{};
+ uint64_t target_batch_payload_size_{};
+ uint64_t max_flow_seg_size_{};
utils::Regex attributeNameRegex_;
- std::atomic<bool> interrupted_;
+ std::atomic<bool> interrupted_{false};
std::mutex messages_mutex_;
std::set<std::shared_ptr<Messages>> messages_set_;
};
diff --git a/win_build_vs.bat b/win_build_vs.bat
index a2beefa..3a24778 100644
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -64,6 +64,9 @@
if [%%~x] EQU [/D] (
set cmake_build_type=RelWithDebInfo
)
+ if [%%~x] EQU [/DD] (
+ set cmake_build_type=Debug
+ )
if [%%~x] EQU [/CI] (
set "strict_gsl_checks=-DSTRICT_GSL_CHECKS=AUDIT"
)