| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "PublishKafka.h" |
| |
| #include <map> |
| #include <set> |
| #include <string> |
| #include <vector> |
| |
| #include "minifi-cpp/core/ProcessContext.h" |
| #include "core/ProcessSession.h" |
| #include "core/Resource.h" |
| #include "range/v3/algorithm/all_of.hpp" |
| #include "rdkafka_utils.h" |
| #include "utils/ProcessorConfigUtils.h" |
| #include "utils/StringUtils.h" |
| #include "minifi-cpp/utils/gsl.h" |
| |
| namespace org::apache::nifi::minifi::processors { |
| |
| namespace { |
| // Message |
| enum class MessageStatus : uint8_t { InFlight, Error, Success }; |
| |
| const char* to_string(const MessageStatus s) { |
| switch (s) { |
| case MessageStatus::InFlight: return "InFlight"; |
| case MessageStatus::Error: return "Error"; |
| case MessageStatus::Success: return "Success"; |
| } |
| throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable code"}; |
| } |
| |
| struct MessageResult { |
| MessageStatus status = MessageStatus::InFlight; |
| rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR; |
| }; |
| |
| struct FlowFileResult { |
| bool flow_file_error = false; |
| std::vector<MessageResult> messages; |
| }; |
| } // namespace |
| |
| class PublishKafka::Messages { |
| std::mutex mutex_; |
| std::condition_variable cv_; |
| std::vector<FlowFileResult> flow_files_; |
| bool interrupted_ = false; |
| const std::shared_ptr<core::logging::Logger> logger_; |
| |
| [[nodiscard]] std::string logStatus(const std::unique_lock<std::mutex>& lock) const { |
| gsl_Expects(lock.owns_lock()); |
| const auto messageresult_ok = [](const MessageResult r) { |
| return r.status == MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; |
| }; |
| const auto messageresult_inflight = [](const MessageResult r) { |
| return r.status == MessageStatus::InFlight && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; |
| }; |
| std::vector<size_t> flow_files_in_flight; |
| std::ostringstream oss; |
| if (interrupted_) { oss << "interrupted, "; } |
| for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) { |
| const auto& [flow_file_error, messages] = flow_files_[ffi]; |
| if (!flow_file_error && ranges::all_of(messages, messageresult_ok)) { |
| continue; // don't log the happy path to reduce log spam |
| } |
| if (!flow_file_error && ranges::all_of(messages, messageresult_inflight)) { |
| flow_files_in_flight.push_back(ffi); |
| continue; // don't log fully in-flight flow files here, log them at the end instead |
| } |
| oss << '[' << ffi << "]: {"; |
| if (flow_file_error) { oss << "error, "; } |
| for (size_t msgi = 0; msgi < messages.size(); ++msgi) { |
| const auto& msg = messages[msgi]; |
| if (messageresult_ok(msg)) { continue; } |
| oss << '<' << msgi << ">: (msg " << to_string(msg.status) << ", " << rd_kafka_err2str(msg.err_code) << "), "; |
| } |
| oss << "}, "; |
| } |
| oss << "in-flight (" << flow_files_in_flight.size() << "): " << utils::string::join(",", flow_files_in_flight); |
| return oss.str(); |
| } |
| |
| public: |
| explicit Messages(std::shared_ptr<core::logging::Logger> logger) : logger_{std::move(logger)} {} |
| |
| void waitForCompletion() { |
| std::unique_lock<std::mutex> lock(mutex_); |
| cv_.wait(lock, [this, &lock] { |
| if (logger_->should_log(core::logging::LOG_LEVEL::trace)) { logger_->log_trace("{}", logStatus(lock)); } |
| return interrupted_ || ranges::all_of(this->flow_files_, [](const FlowFileResult& flow_file) { |
| return flow_file.flow_file_error || ranges::all_of(flow_file.messages, [](const MessageResult& message) { |
| return message.status != MessageStatus::InFlight; |
| }); |
| }); |
| }); |
| } |
| |
| template<typename Func> |
| auto modifyResult(size_t index, const Func& fun) -> decltype(fun(flow_files_.at(index))) { |
| std::unique_lock<std::mutex> lock(mutex_); |
| const auto notifier = gsl::finally([this] { cv_.notify_all(); }); |
| try { |
| return fun(flow_files_.at(index)); |
| } catch (const std::exception& ex) { |
| logger_->log_warn("Messages::modifyResult exception: {}", ex.what()); |
| throw; |
| } |
| } |
| |
| size_t addFlowFile() { |
| std::lock_guard<std::mutex> lock(mutex_); |
| flow_files_.emplace_back(); |
| return flow_files_.size() - 1; |
| } |
| |
| template<typename Func> |
| auto iterateFlowFiles(Func fun) -> std::void_t<decltype(fun(size_t{0}, flow_files_.front()))> { |
| std::lock_guard<std::mutex> lock(mutex_); |
| for (size_t index = 0U; index < flow_files_.size(); index++) { fun(index, flow_files_[index]); } |
| } |
| |
| void interrupt() { |
| std::unique_lock<std::mutex> lock(mutex_); |
| interrupted_ = true; |
| cv_.notify_all(); |
| gsl_Ensures(interrupted_); |
| } |
| |
| bool wasInterrupted() { |
| std::lock_guard<std::mutex> lock(mutex_); |
| return interrupted_; |
| } |
| }; |
| |
| namespace { |
| class ReadCallback { |
| void allocate_message_object(const size_t segment_num) const { |
| messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) { |
| // allocate message object to be filled in by the callback in produce() |
| if (flow_file.messages.size() < segment_num + 1) { flow_file.messages.resize(segment_num + 1); } |
| gsl_Ensures(flow_file.messages.size() > segment_num); |
| }); |
| } |
| |
| static utils::rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, |
| const std::optional<utils::Regex>& attribute_name_regex) { |
| utils::rd_kafka_headers_unique_ptr result{rd_kafka_headers_new(8)}; |
| if (!result) { throw std::bad_alloc{}; } |
| |
| for (const auto& [attribute_key, attribute_value]: flow_file.getAttributes()) { |
| if (attribute_name_regex && utils::regexMatch(attribute_key, *attribute_name_regex)) { |
| rd_kafka_header_add(result.get(), |
| attribute_key.c_str(), |
| gsl::narrow<ssize_t>(attribute_key.size()), |
| attribute_value.c_str(), |
| gsl::narrow<ssize_t>(attribute_value.size())); |
| } |
| } |
| return result; |
| } |
| |
| rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<std::byte>& buffer, const size_t buflen) const { |
| const std::shared_ptr<PublishKafka::Messages> messages_ptr_copy = this->messages_; |
| const auto flow_file_index_copy = this->flow_file_index_; |
| const auto logger = logger_; |
| const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num, logger](rd_kafka_t* /*rk*/, |
| const rd_kafka_message_t* rkmessage) { |
| messages_ptr_copy->modifyResult(flow_file_index_copy, |
| [segment_num, rkmessage, logger, flow_file_index_copy](FlowFileResult& flow_file) { |
| auto& [status, err_code] = flow_file.messages.at(segment_num); |
| err_code = rkmessage->err; |
| status = err_code == 0 ? MessageStatus::Success : MessageStatus::Error; |
| if (err_code != RD_KAFKA_RESP_ERR_NO_ERROR) { |
| logger->log_warn("delivery callback, flow file #{}/segment #{}: {}", |
| flow_file_index_copy, |
| segment_num, |
| rd_kafka_err2str(err_code)); |
| } else { |
| logger->log_debug("delivery callback, flow file #{}/segment #{}: success", flow_file_index_copy, segment_num); |
| } |
| }); |
| }; |
| // release()d below, deallocated in PublishKafka::messageDeliveryCallback |
| auto callback_ptr = |
| std::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback)); |
| |
| allocate_message_object(segment_num); |
| |
| auto hdrs_copy = utils::rd_kafka_headers_unique_ptr{rd_kafka_headers_copy(hdrs.get())}; |
| const auto err = rd_kafka_producev(rk_, |
| RD_KAFKA_V_RKT(rkt_), |
| RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), |
| RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), |
| RD_KAFKA_V_VALUE(buffer.data(), buflen), |
| RD_KAFKA_V_HEADERS(hdrs_copy.get()), |
| RD_KAFKA_V_KEY(key_.c_str(), key_.size()), |
| RD_KAFKA_V_OPAQUE(callback_ptr.get()), |
| RD_KAFKA_V_END); |
| if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { |
| // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback |
| // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it |
| std::ignore = hdrs_copy.release(); |
| // in case of success, rd_kafka_producev takes ownership of the headers, so we no longer need to delete it |
| std::ignore = callback_ptr.release(); |
| } |
| logger_->log_trace("produce enqueued flow file #{}/segment #{}: {}", // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks) |
| flow_file_index_, |
| segment_num, |
| rd_kafka_err2str(err)); |
| return err; |
| } |
| |
| public: |
| ReadCallback(const uint64_t max_seg_size, std::string key, rd_kafka_topic_t* const rkt, rd_kafka_t* const rk, |
| const core::FlowFile& flowFile, const std::optional<utils::Regex>& attributeNameRegex, |
| std::shared_ptr<PublishKafka::Messages> messages, const size_t flow_file_index, const bool fail_empty_flow_files, |
| std::shared_ptr<core::logging::Logger> logger) |
| : flow_size_(flowFile.getSize()), |
| max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size), |
| key_(std::move(key)), |
| rkt_(rkt), |
| rk_(rk), |
| hdrs(make_headers(flowFile, attributeNameRegex)), |
| messages_(std::move(messages)), |
| flow_file_index_(flow_file_index), |
| fail_empty_flow_files_(fail_empty_flow_files), |
| logger_(std::move(logger)) {} |
| |
| ReadCallback(ReadCallback&&) = delete; |
| ReadCallback(const ReadCallback&) = delete; |
| ReadCallback& operator=(ReadCallback&&) = delete; |
| ReadCallback& operator=(const ReadCallback&) = delete; |
| ~ReadCallback() = default; |
| |
| io::IoResult operator()(const std::shared_ptr<io::InputStream>& stream) { |
| std::vector<std::byte> buffer; |
| |
| buffer.resize(max_seg_size_); |
| read_size_ = 0; |
| status_ = 0; |
| called_ = true; |
| |
| gsl_Expects(max_seg_size_ != 0 || (flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0")); |
| // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases |
| const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_); |
| messages_->modifyResult(flow_file_index_, |
| [reserved_msg_capacity](FlowFileResult& flow_file) { flow_file.messages.reserve(reserved_msg_capacity); }); |
| |
| // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_ |
| if (flow_size_ == 0 && !fail_empty_flow_files_) { |
| const auto err = produce(0, buffer, 0); |
| if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { |
| status_ = -1; |
| error_ = rd_kafka_err2str(err); |
| } |
| return io::IoResult::zero(); |
| } |
| |
| for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) { |
| const auto readRet = stream->read(buffer); |
| if (io::isError(readRet)) { |
| status_ = -1; |
| error_ = "Failed to read from stream"; |
| return io::IoResult::zero(); |
| } |
| if (readRet == 0) { break; } |
| |
| if (const auto err = produce(segment_num, buffer, readRet)) { |
| messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) { |
| auto& [status, err_code] = flow_file.messages.at(segment_num); |
| status = MessageStatus::Error; |
| err_code = err; |
| }); |
| status_ = -1; |
| error_ = rd_kafka_err2str(err); |
| return io::IoResult::zero(); |
| } |
| read_size_ += gsl::narrow<uint32_t>(readRet); |
| } |
| return io::IoResult::from(gsl::narrow<uint64_t>(read_size_)); |
| } |
| |
| const uint64_t flow_size_ = 0; |
| const uint64_t max_seg_size_ = 0; |
| const std::string key_; |
| rd_kafka_topic_t* const rkt_ = nullptr; |
| rd_kafka_t* const rk_ = nullptr; |
| const gsl::not_null<utils::rd_kafka_headers_unique_ptr> hdrs; |
| const std::shared_ptr<PublishKafka::Messages> messages_; |
| const size_t flow_file_index_; |
| int status_ = 0; |
| std::string error_; |
| uint32_t read_size_ = 0; |
| bool called_ = false; |
| const bool fail_empty_flow_files_ = true; |
| const std::shared_ptr<core::logging::Logger> logger_; |
| }; |
| |
| /** |
| * Message delivery report callback using the richer rd_kafka_message_t object. |
| */ |
| void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) { |
| if (rkmessage->_private == nullptr) { return; } |
| // allocated in ReadCallback::produce |
| const auto* const func = static_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private); |
| try { |
| (*func)(rk, rkmessage); |
| } catch (...) {} |
| delete func; // NOLINT(cppcoreguidelines-owning-memory) |
| } |
| } // namespace |
| |
| void PublishKafka::initialize() { |
| setSupportedProperties(Properties); |
| setSupportedRelationships(Relationships); |
| } |
| |
| void PublishKafka::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { |
| interrupted_ = false; |
| |
| // Try to get a KafkaConnection |
| std::string client_id = utils::parseProperty(context, ClientName); |
| std::string brokers = utils::parseProperty(context, SeedBrokers); |
| // Get some properties not (only) used directly to set up librdkafka |
| |
| // Batch Size |
| batch_size_ = utils::parseU64Property(context, BatchSize); |
| logger_->log_debug("PublishKafka: Batch Size [{}]", batch_size_); |
| |
| // Target Batch Payload Size |
| target_batch_payload_size_ = utils::parseDataSizeProperty(context, TargetBatchPayloadSize); |
| logger_->log_debug("PublishKafka: Target Batch Payload Size [{}]", target_batch_payload_size_); |
| |
| // Max Flow Segment Size |
| max_flow_seg_size_ = utils::parseDataSizeProperty(context, MaxFlowSegSize); |
| logger_->log_debug("PublishKafka: Max Flow Segment Size [{}]", max_flow_seg_size_); |
| |
| // Attributes to Send as Headers |
| attributeNameRegex_ = context.getProperty(AttributeNameRegex) |
| | utils::transform([](auto pattern_str) { return utils::Regex{std::move(pattern_str)}; }) |
| | utils::toOptional(); |
| |
| key_.brokers_ = brokers; |
| key_.client_id_ = client_id; |
| |
| conn_ = std::make_unique<KafkaConnection>(key_); |
| configureNewConnection(context); |
| |
| if (const auto message_key_field = context.getProperty(MessageKeyField); message_key_field && !message_key_field->empty()) { |
| logger_->log_error("The {} property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead.", MessageKeyField.name); |
| } |
| |
| logger_->log_debug("Successfully configured PublishKafka"); |
| } |
| |
| void PublishKafka::notifyStop() { |
| logger_->log_debug("notifyStop called"); |
| interrupted_ = true; |
| { |
| // Normally when we need both connection_mutex_ and messages_mutex_, we need to take connection_mutex_ first to avoid a |
| // deadlock. It's not possible to do that here, because we need to interrupt the messages while onTrigger is running and |
| // holding connection_mutex_. For this reason, we take messages_mutex_ only, interrupt the messages, then release the |
| // lock to let a possibly running onTrigger take it and finish. After onTrigger finishes, we can take connection_mutex_ |
| // and close the connection without needing to wait for message finishes/timeouts in onTrigger. A possible new onTrigger |
| // between our critical sections won't produce more messages because we set interrupted_ = true above. |
| std::lock_guard<std::mutex> lock(messages_mutex_); |
| for (auto& messages: messages_set_) { messages->interrupt(); } |
| } |
| std::lock_guard<std::mutex> conn_lock(connection_mutex_); |
| conn_.reset(); |
| } |
| |
| bool PublishKafka::configureNewConnection(core::ProcessContext& context) { |
| std::array<char, 512U> err_chars{}; |
| rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK; |
| constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error result: "; |
| |
| utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()}; |
| if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); } |
| |
| const auto* const key = conn_->getKey(); |
| |
| if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers"); } |
| result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", key->brokers_.c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| |
| if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty"); } |
| result = rd_kafka_conf_set(conf_.get(), "client.id", key->client_id_.c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| |
| if (const auto debug_context = context.getProperty(DebugContexts)) { |
| result = rd_kafka_conf_set(conf_.get(), "debug", debug_context->c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: debug [{}]", *debug_context); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| } |
| |
| if (const auto max_message_size = context.getProperty(MaxMessageSize); max_message_size && !max_message_size->empty()) { |
| result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", max_message_size->c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: message.max.bytes [{}]", *max_message_size); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| } |
| |
| if (const auto queue_buffer_max_message = utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) { |
| if (*queue_buffer_max_message < batch_size_) { |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot be larger than Queue Max Message"); |
| } |
| |
| const auto value = std::to_string(*queue_buffer_max_message); |
| result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]", value); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| } |
| |
| if (const auto queue_buffer_max_size = utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) { |
| auto valInt = *queue_buffer_max_size / 1024; |
| auto valueConf = std::to_string(valInt); |
| result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", valueConf.c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]", valueConf); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| } |
| |
| if (const auto queue_buffer_max_time = utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) { |
| auto valueConf = std::to_string(queue_buffer_max_time->count()); |
| result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", valueConf.c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", valueConf); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| } |
| |
| if (const auto batch_size = utils::parseOptionalU64Property(context, BatchSize)) { |
| auto value = std::to_string(*batch_size); |
| result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", value.c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: batch.num.messages [{}]", value); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| } |
| |
| if (const auto compress_codec = context.getProperty(CompressCodec); compress_codec && !compress_codec->empty() && *compress_codec != "none") { |
| result = rd_kafka_conf_set(conf_.get(), "compression.codec", compress_codec->c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: compression.codec [{}]", *compress_codec); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| } |
| |
| setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get())); |
| |
| // Add all the dynamic properties as librdkafka configurations |
| const auto& dynamic_prop_keys = context.getDynamicPropertyKeys(); |
| logger_->log_info("PublishKafka registering {} librdkafka dynamic properties", dynamic_prop_keys.size()); |
| |
| for (const auto& prop_key : dynamic_prop_keys) { |
| if (const auto dynamic_property_value = context.getDynamicProperty(prop_key, nullptr); dynamic_property_value && !dynamic_property_value->empty()) { |
| logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]", prop_key, *dynamic_property_value); |
| result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(), dynamic_property_value->c_str(), err_chars.data(), err_chars.size()); |
| if (result != RD_KAFKA_CONF_OK) { |
| auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| } else { |
| logger_->log_warn( |
| "PublishKafka Dynamic Property '{}' is empty and therefore will not " |
| "be configured", |
| prop_key); |
| } |
| } |
| |
| // Set the delivery callback |
| rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback); |
| |
| // Set the logger callback |
| rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback); |
| |
| // The producer takes ownership of the configuration, we must not free it |
| utils::rd_kafka_producer_unique_ptr producer{ |
| rd_kafka_new(RD_KAFKA_PRODUCER, conf_.release(), err_chars.data(), err_chars.size())}; |
| if (producer == nullptr) { |
| auto error_msg = utils::string::join_pack("Failed to create Kafka producer ", err_chars.data()); |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); |
| } |
| |
| conn_->setConnection(std::move(producer)); |
| |
| return true; |
| } |
| |
| bool PublishKafka::createNewTopic(core::ProcessContext& context, const std::string& topic_name, |
| const std::shared_ptr<core::FlowFile>& flow_file) const { |
| utils::rd_kafka_topic_conf_unique_ptr topic_conf_{rd_kafka_topic_conf_new()}; |
| if (topic_conf_ == nullptr) { |
| logger_->log_error("Failed to create rd_kafka_topic_conf_t object"); |
| return false; |
| } |
| |
| rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK; |
| std::array<char, 512U> err_chars{}; |
| |
| if (auto delivery_guarantee = context.getProperty(DeliveryGuarantee, flow_file.get())) { |
| /* |
| * Because of a previous error in this processor, the default value of this property was "DELIVERY_ONE_NODE". |
| * As this is not a valid value for "request.required.acks", the following rd_kafka_topic_conf_set call failed, |
| * but because of another error, this failure was silently ignored, meaning that the default value for |
| * "request.required.acks" did not change, and thus remained "-1". This means that having "DELIVERY_ONE_NODE" as |
| * the value of this property actually caused the processor to wait for delivery ACKs from ALL nodes, instead |
| * of just one. In order not to break configurations generated with earlier versions and keep the same behaviour |
| * as they had, we have to map "DELIVERY_ONE_NODE" to "-1" here. |
| */ |
| if (*delivery_guarantee == "DELIVERY_ONE_NODE") { |
| delivery_guarantee = "-1"; |
| logger_->log_warn("Using DELIVERY_ONE_NODE as the Delivery Guarantee property is deprecated and is translated to -1 " |
| "(block until message is committed by all in sync replicas) for backwards compatibility. " |
| "If you want to wait for one acknowledgment use '1' as the property."); |
| } |
| result = rd_kafka_topic_conf_set(topic_conf_.get(), "request.required.acks", delivery_guarantee->c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: request.required.acks [{}]", *delivery_guarantee); |
| if (result != RD_KAFKA_CONF_OK) { |
| logger_->log_error("PublishKafka: configure request.required.acks error result [{}]", err_chars.data()); |
| return false; |
| } |
| } |
| |
| if (const auto request_timeout = utils::parseOptionalDurationProperty(context, RequestTimeOut)) { |
| auto valueConf = std::to_string(request_timeout->count()); |
| result = rd_kafka_topic_conf_set(topic_conf_.get(), "request.timeout.ms", valueConf.c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: request.timeout.ms [{}]", valueConf); |
| if (result != RD_KAFKA_CONF_OK) { |
| logger_->log_error("PublishKafka: configure request.timeout.ms error result [{}]", err_chars.data()); |
| return false; |
| } |
| } |
| |
| if (const auto message_timeout = utils::parseOptionalDurationProperty(context, MessageTimeOut)) { |
| auto valueConf = std::to_string(message_timeout->count()); |
| result = rd_kafka_topic_conf_set(topic_conf_.get(), "message.timeout.ms", valueConf.c_str(), err_chars.data(), err_chars.size()); |
| logger_->log_debug("PublishKafka: message.timeout.ms [{}]", valueConf); |
| if (result != RD_KAFKA_CONF_OK) { |
| logger_->log_error("PublishKafka: configure message.timeout.ms error result [{}]", err_chars.data()); |
| return false; |
| } |
| } |
| |
| // The topic takes ownership of the configuration, we must not free it |
| utils::rd_kafka_topic_unique_ptr topic_reference{ |
| rd_kafka_topic_new(conn_->getConnection(), topic_name.c_str(), topic_conf_.release())}; |
| if (topic_reference == nullptr) { |
| const rd_kafka_resp_err_t resp_err = rd_kafka_last_error(); |
| logger_->log_error("PublishKafka: failed to create topic {}, error: {}", topic_name.c_str(), rd_kafka_err2str(resp_err)); |
| return false; |
| } |
| |
| const auto kafka_topic_ref = |
| std::make_shared<KafkaTopic>(std::move(topic_reference)); // KafkaTopic takes ownership of topic_reference |
| conn_->putTopic(topic_name, kafka_topic_ref); |
| |
| return true; |
| } |
| |
| std::optional<utils::net::SslData> PublishKafka::getSslData(core::ProcessContext& context) const { |
| if (auto result = KafkaProcessorBase::getSslData(context); result) { return result; } |
| |
| utils::net::SslData ssl_data; |
| if (auto security_ca = context.getProperty(SecurityCA)) { ssl_data.ca_loc = *security_ca; } |
| if (auto security_cert = context.getProperty(SecurityCert)) { ssl_data.cert_loc = *security_cert; } |
| if (auto security_private_key = context.getProperty(SecurityPrivateKey)) { ssl_data.key_loc = *security_private_key; } |
| if (auto security_private_key_pass = context.getProperty(SecurityPrivateKeyPassWord)) { |
| ssl_data.key_pw = *security_private_key_pass; |
| } |
| return ssl_data; |
| } |
| |
| void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { |
| // Check whether we have been interrupted |
| if (interrupted_) { |
| logger_->log_info("The processor has been interrupted, not running onTrigger"); |
| context.yield(); |
| return; |
| } |
| |
| std::lock_guard<std::mutex> lock_connection(connection_mutex_); |
| logger_->log_debug("PublishKafka onTrigger"); |
| |
| // Collect FlowFiles to process |
| uint64_t actual_bytes = 0U; |
| std::vector<std::shared_ptr<core::FlowFile>> flowFiles; |
| for (uint64_t i = 0; i < batch_size_; i++) { |
| std::shared_ptr<core::FlowFile> flowFile = session.get(); |
| if (flowFile == nullptr) { break; } |
| actual_bytes += flowFile->getSize(); |
| flowFiles.emplace_back(std::move(flowFile)); |
| if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) { break; } |
| } |
| if (flowFiles.empty()) { |
| context.yield(); |
| return; |
| } |
| logger_->log_debug("Processing {} flow files with a total size of {} B", flowFiles.size(), actual_bytes); |
| |
| auto messages = std::make_shared<Messages>(logger_); |
| // We must add this to the messages set, so that it will be interrupted when notifyStop is called |
| { |
| std::lock_guard<std::mutex> lock(messages_mutex_); |
| messages_set_.emplace(messages); |
| } |
| // We also have to ensure that it will be removed once we are done with it |
| const auto messagesSetGuard = gsl::finally([&]() { |
| std::lock_guard<std::mutex> lock(messages_mutex_); |
| messages_set_.erase(messages); |
| }); |
| |
| // Process FlowFiles |
| for (auto& flowFile: flowFiles) { |
| const size_t flow_file_index = messages->addFlowFile(); |
| |
| // Get Topic (FlowFile-dependent EL property) |
| const auto topic = context.getProperty(Topic, flowFile.get()); |
| if (topic) { |
| logger_->log_debug("PublishKafka: topic for flow file {} is '{}'", flowFile->getUUIDStr(), *topic); |
| } else { |
| logger_->log_error("Flow file {} does not have a valid Topic", flowFile->getUUIDStr()); |
| messages->modifyResult(flow_file_index, |
| [](FlowFileResult& flow_file_result) { flow_file_result.flow_file_error = true; }); |
| continue; |
| } |
| |
| // Add topic to the connection if needed |
| if (!conn_->hasTopic(*topic)) { |
| if (!createNewTopic(context, *topic, flowFile)) { |
| logger_->log_error("Failed to add topic {}", *topic); |
| messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { |
| flow_file_result.flow_file_error = true; |
| }); |
| continue; |
| } |
| } |
| |
| std::string kafkaKey = context.getProperty(KafkaKey, flowFile.get()).value_or(flowFile->getUUIDStr()); |
| |
| logger_->log_debug("PublishKafka: Message Key [{}]", kafkaKey); |
| |
| auto thisTopic = conn_->getTopic(*topic); |
| if (thisTopic == nullptr) { |
| logger_->log_error("Topic {} is invalid", *topic); |
| messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { |
| flow_file_result.flow_file_error = true; |
| }); |
| continue; |
| } |
| |
| bool failEmptyFlowFiles = utils::parseBoolProperty(context, FailEmptyFlowFiles); |
| |
| ReadCallback callback(max_flow_seg_size_, |
| kafkaKey, |
| thisTopic->getTopic(), |
| conn_->getConnection(), |
| *flowFile, |
| attributeNameRegex_, |
| messages, |
| flow_file_index, |
| failEmptyFlowFiles, |
| logger_); |
| session.read(flowFile, std::ref(callback)); |
| |
| if (!callback.called_) { |
| // workaround: call callback since ProcessSession doesn't do so for empty flow files without resource claims |
| callback(nullptr); |
| } |
| |
| if (flowFile->getSize() == 0 && failEmptyFlowFiles) { |
| logger_->log_debug( |
| "Deprecated behavior, use connections to drop empty flow files! " |
| "Failing empty flow file with uuid: {}", |
| flowFile->getUUIDStr()); |
| messages->modifyResult(flow_file_index, |
| [](FlowFileResult& flow_file_result) { flow_file_result.flow_file_error = true; }); |
| } |
| |
| if (callback.status_ < 0) { |
| logger_->log_error("Failed to send flow to kafka topic {}, error: {}", *topic, callback.error_); |
| messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { |
| flow_file_result.flow_file_error = true; |
| }); |
| continue; |
| } |
| } |
| |
| logger_->log_trace("PublishKafka::onTrigger waitForCompletion start"); |
| messages->waitForCompletion(); |
| if (messages->wasInterrupted()) { |
| logger_->log_warn( |
| "Waiting for delivery confirmation was interrupted, some flow files " |
| "might be routed to Failure, even if they were successfully " |
| "delivered."); |
| } |
| logger_->log_trace("PublishKafka::onTrigger waitForCompletion finish"); |
| |
| messages->iterateFlowFiles([&](size_t index, const FlowFileResult& flow_file) { |
| bool success = false; |
| if (flow_file.flow_file_error) { |
| success = false; |
| } else { |
| success = true; |
| for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) { |
| const auto& message = flow_file.messages[segment_num]; |
| switch (message.status) { |
| case MessageStatus::InFlight: |
| success = false; |
| logger_->log_error( |
| "Waiting for delivery confirmation was interrupted for flow " |
| "file {} segment {}", |
| flowFiles[index]->getUUIDStr(), |
| segment_num); |
| break; |
| case MessageStatus::Error: |
| success = false; |
| logger_->log_error("Failed to deliver flow file {} segment {}, error: {}", |
| flowFiles[index]->getUUIDStr(), |
| segment_num, |
| rd_kafka_err2str(message.err_code)); |
| break; |
| case MessageStatus::Success: |
| logger_->log_debug("Successfully delivered flow file {} segment {}", |
| flowFiles[index]->getUUIDStr(), |
| segment_num); |
| break; |
| } |
| } |
| } |
| if (success) { |
| session.transfer(flowFiles[index], Success); |
| } else { |
| session.penalize(flowFiles[index]); |
| session.transfer(flowFiles[index], Failure); |
| } |
| }); |
| } |
| |
| REGISTER_RESOURCE(PublishKafka, Processor); |
| |
| } // namespace org::apache::nifi::minifi::processors |