blob: adbff6630ada697f561c50a9eb31d1c07cbbc512 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConsumeKafka.h"
#include <algorithm>
#include <ranges>
#include "minifi-cpp/core/FlowFile.h"
#include "core/ProcessSession.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "core/Resource.h"
#include "utils/OptionalUtils.h"
#include "utils/AttributeErrors.h"
#include "utils/ProcessorConfigUtils.h"
#include "minifi-cpp/utils/gsl.h"
#include "utils/expected.h"
using namespace std::literals::chrono_literals;
template<>
struct std::hash<org::apache::nifi::minifi::processors::ConsumeKafka::KafkaMessageLocation> {
size_t operator()(const org::apache::nifi::minifi::processors::ConsumeKafka::KafkaMessageLocation& message_location) const noexcept {
return org::apache::nifi::minifi::utils::hash_combine(std::hash<std::string_view>{}(message_location.topic),
std::hash<int32_t>{}(message_location.partition));
}
};
namespace org::apache::nifi::minifi::processors {
// The upper limit for Max Poll Time is 4 seconds. This is because Watchdog would potentially start
// reporting issues with the processor health otherwise
bool consume_kafka::ConsumeKafkaMaxPollTimePropertyValidator::validate(const std::string_view input) const {
const auto parsed_time = parsing::parseDurationMinMax<std::chrono::nanoseconds>(input, 0ms, 4s);
return parsed_time.has_value();
}
void ConsumeKafka::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void ConsumeKafka::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
// Required properties
topic_names_ = utils::string::splitAndTrim(utils::parseProperty(context, TopicNames), ",");
topic_name_format_ = utils::parseEnumProperty<consume_kafka::TopicNameFormatEnum>(context, TopicNameFormat);
commit_policy_ = utils::parseEnumProperty<consume_kafka::CommitPolicyEnum>(context, CommitPolicy);
key_attribute_encoding_ = utils::parseEnumProperty<utils::KafkaEncoding>(context, KeyAttributeEncoding);
message_header_encoding_ = utils::parseEnumProperty<utils::KafkaEncoding>(context, MessageHeaderEncoding);
duplicate_header_handling_ = utils::parseEnumProperty<consume_kafka::MessageHeaderPolicyEnum>(context, DuplicateHeaderHandling);
max_poll_time_milliseconds_ = utils::parseDurationProperty(context, MaxPollTime);
max_poll_records_ = gsl::narrow<uint32_t>(utils::parseU64Property(context, MaxPollRecords));
// Optional properties
message_demarcator_ = utils::parseOptionalProperty(context, MessageDemarcator);
headers_to_add_as_attributes_ = parseOptionalProperty(context, HeadersToAddAsAttributes)
| utils::transform([](const std::string& headers_to_add_str) { return utils::string::splitAndTrim(headers_to_add_str, ","); });
if (message_demarcator_ && headers_to_add_as_attributes_) {
logger_->log_error("Message merging with header extraction is not yet supported");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Message merging with header extraction is not yet supported");
}
configureNewConnection(context);
if (commit_policy_ == consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) {
setTriggerWhenEmpty(true);
} else if (context.hasIncomingConnections()) {
logger_->log_error("Incoming connections are not allowed with {}", magic_enum::enum_name(commit_policy_));
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Incoming connections are not allowed with {}", magic_enum::enum_name(commit_policy_)));
}
}
namespace {
void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) {
// Cooperative, incremental assignment is not supported in the current librdkafka version
std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ConsumeKafka>::getLogger()};
logger->log_debug("Rebalance triggered.");
rd_kafka_resp_err_t assign_error = RD_KAFKA_RESP_ERR_NO_ERROR;
switch (trigger) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
logger->log_debug("assigned:");
if (logger->should_log(core::logging::LOG_LEVEL::debug)) { utils::print_topics_list(*logger, *partitions); }
assign_error = rd_kafka_assign(rk, partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
logger->log_debug("revoked:");
rd_kafka_commit(rk, partitions, /* async = */ 0); // Sync commit, maybe unnecessary
if (logger->should_log(core::logging::LOG_LEVEL::debug)) { utils::print_topics_list(*logger, *partitions); }
assign_error = rd_kafka_assign(rk, nullptr);
break;
default:
logger->log_debug("failed: {}", rd_kafka_err2str(trigger));
assign_error = rd_kafka_assign(rk, nullptr);
break;
}
logger->log_debug("assign failure: {}", rd_kafka_err2str(assign_error));
}
} // namespace
void ConsumeKafka::createTopicPartitionList() {
kf_topic_partition_list_ = utils::rd_kafka_topic_partition_list_unique_ptr{
rd_kafka_topic_partition_list_new(gsl::narrow<int>(topic_names_.size()))};
if (topic_name_format_ == consume_kafka::TopicNameFormatEnum::Patterns) {
for (const std::string& topic: topic_names_) {
const std::string regex_format = "^" + topic;
rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(), regex_format.c_str(), RD_KAFKA_PARTITION_UA);
}
} else {
for (const std::string& topic: topic_names_) {
rd_kafka_topic_partition_list_add(kf_topic_partition_list_.get(), topic.c_str(), RD_KAFKA_PARTITION_UA);
}
}
// Subscribe to topic set using balanced consumer groups
// Subscribing from the same process without an inbetween unsubscribe call
// Does not seem to be triggering a rebalance (maybe librdkafka bug?)
// This might happen until the cross-overship between processors and connections are settled
rd_kafka_resp_err_t subscribe_response = rd_kafka_subscribe(consumer_.get(), kf_topic_partition_list_.get());
if (RD_KAFKA_RESP_ERR_NO_ERROR != subscribe_response) {
logger_->log_error("rd_kafka_subscribe error {}: {}", magic_enum::enum_underlying(subscribe_response), rd_kafka_err2str(subscribe_response));
}
}
void ConsumeKafka::extendConfigFromDynamicProperties(const core::ProcessContext& context) const {
using utils::setKafkaConfigurationField;
const std::vector<std::string> dynamic_prop_keys = context.getDynamicPropertyKeys();
if (dynamic_prop_keys.empty()) { return; }
logger_->log_info("Loading {} extra kafka configuration fields from ConsumeKafka dynamic properties:", dynamic_prop_keys.size());
for (const std::string& key: dynamic_prop_keys) {
std::string value = context.getDynamicProperty(key)
| utils::orThrow(fmt::format("This shouldn't happen, dynamic property {} is expected because we just queried the list of dynamic properties", key));
logger_->log_info("{}: {}", key.c_str(), value.c_str());
setKafkaConfigurationField(*conf_, key, value);
}
}
void ConsumeKafka::configureNewConnection(core::ProcessContext& context) {
using utils::setKafkaConfigurationField;
conf_ = {rd_kafka_conf_new(), utils::rd_kafka_conf_deleter()};
if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); }
// Set rebalance callback for use with coordinated consumer group balancing
// Rebalance handlers are needed for the initial configuration of the consumer
// If they are not set, offset reset is ignored and polling produces messages
// Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that
// responsibility to the application's rebalance_cb.
if (commit_policy_ != consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) {
rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb);
}
// Uncomment this for librdkafka debug logs:
// logger_->log_info("Enabling all debug logs for kafka consumer.");
// setKafkaConfigurationField(*conf_, "debug", "all");
setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
setKafkaConfigurationField(*conf_, "bootstrap.servers", utils::parseProperty(context, KafkaBrokers));
setKafkaConfigurationField(*conf_, "allow.auto.create.topics", "true");
setKafkaConfigurationField(*conf_,
"auto.offset.reset",
std::string(magic_enum::enum_name(utils::parseEnumProperty<consume_kafka::OffsetResetPolicyEnum>(context, OffsetReset))));
setKafkaConfigurationField(*conf_, "enable.auto.commit", std::to_string(commit_policy_ == consume_kafka::CommitPolicyEnum::AutoCommit));
setKafkaConfigurationField(*conf_, "enable.auto.offset.store", std::to_string(commit_policy_ == consume_kafka::CommitPolicyEnum::AutoCommit));
setKafkaConfigurationField(*conf_, "isolation.level", utils::parseBoolProperty(context, HonorTransactions) ? "read_committed" : "read_uncommitted");
setKafkaConfigurationField(*conf_, "group.id", utils::parseProperty(context, GroupID));
setKafkaConfigurationField(*conf_, "client.id", this->getUUIDStr());
setKafkaConfigurationField(*conf_, "session.timeout.ms", std::to_string(utils::parseDurationProperty(context, SessionTimeout).count()));
// Twice the default, arbitrarily chosen
setKafkaConfigurationField(*conf_, "max.poll.interval.ms", "600000");
extendConfigFromDynamicProperties(context);
std::array<char, 512U> errstr{};
consumer_ = {rd_kafka_new(RD_KAFKA_CONSUMER, conf_.release(), errstr.data(), errstr.size()), utils::rd_kafka_consumer_deleter()};
if (consumer_ == nullptr) {
const std::string error_msg{errstr.data()};
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create Kafka consumer " + error_msg);
}
createTopicPartitionList();
// Changing the partition list should happen only as part as the initialization of offsets
// a function like `rd_kafka_position()` might have unexpected effects
// for instance when a consumer gets assigned a partition it used to
// consume at an earlier rebalance.
//
// As far as I understand, instead of rd_kafka_position() an rd_kafka_committed() call if preferred here,
// as it properly fetches offsets from the broker
if (const auto retrieved_committed = rd_kafka_committed(consumer_.get(), kf_topic_partition_list_.get(), METADATA_COMMUNICATIONS_TIMEOUT_MS); RD_KAFKA_RESP_ERR_NO_ERROR != retrieved_committed) {
logger_->log_error("Retrieving committed offsets for topics+partitions failed {}: {}",
magic_enum::enum_underlying(retrieved_committed),
rd_kafka_err2str(retrieved_committed));
}
if (rd_kafka_resp_err_t poll_set_consumer_response = rd_kafka_poll_set_consumer(consumer_.get()); RD_KAFKA_RESP_ERR_NO_ERROR != poll_set_consumer_response) {
logger_->log_error("rd_kafka_poll_set_consumer error {}: {}",
magic_enum::enum_underlying(poll_set_consumer_response),
rd_kafka_err2str(poll_set_consumer_response));
}
}
std::string ConsumeKafka::extractMessage(const rd_kafka_message_t& rkmessage) {
if (RD_KAFKA_RESP_ERR_NO_ERROR != rkmessage.err) {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION,
"ConsumeKafka: received error message from broker: " + std::to_string(rkmessage.err) + " " + rd_kafka_err2str(rkmessage.err));
}
return {static_cast<char*>(rkmessage.payload), rkmessage.len};
}
std::unordered_map<ConsumeKafka::KafkaMessageLocation, ConsumeKafka::MessageBundle> ConsumeKafka::pollKafkaMessages() {
std::unordered_map<KafkaMessageLocation, MessageBundle> message_bundles;
const auto start = std::chrono::steady_clock::now();
auto elapsed = std::chrono::steady_clock::now() - start;
size_t message_count = 0;
while (message_count < max_poll_records_ && elapsed < max_poll_time_milliseconds_) {
logger_->log_debug("Polling for new messages for {}...", max_poll_time_milliseconds_);
const auto timeout_ms = gsl::narrow<int>(std::chrono::duration_cast<std::chrono::milliseconds>(max_poll_time_milliseconds_ - elapsed).count());
utils::rd_kafka_message_unique_ptr message{rd_kafka_consumer_poll(consumer_.get(), timeout_ms)};
if (!message) { break; }
if (RD_KAFKA_RESP_ERR_NO_ERROR != message->err) {
logger_->log_error("Received message with error {}: {}", magic_enum::enum_underlying(message->err), rd_kafka_err2str(message->err));
break;
}
const std::string_view topic_name = rd_kafka_topic_name(message->rkt);
const int32_t partition = message->partition;
message_bundles[KafkaMessageLocation{std::string(topic_name), partition}].pushBack(std::move(message));
elapsed = std::chrono::steady_clock::now() - start;
++message_count;
}
return message_bundles;
}
std::string ConsumeKafka::resolve_duplicate_headers(const std::vector<std::string>& matching_headers) const {
switch (duplicate_header_handling_) {
case consume_kafka::MessageHeaderPolicyEnum::KEEP_FIRST: return matching_headers.front();
case consume_kafka::MessageHeaderPolicyEnum::KEEP_LATEST: return matching_headers.back();
case consume_kafka::MessageHeaderPolicyEnum::COMMA_SEPARATED_MERGE: return utils::string::join(", ", matching_headers);
default: throw Exception(PROCESSOR_EXCEPTION, "\"Duplicate Header Handling\" property not recognized.");
}
}
std::vector<std::string> ConsumeKafka::get_matching_headers(const rd_kafka_message_t& message, const std::string& header_name) const {
// Headers fetched this way are freed when rd_kafka_message_destroy is called
// Detaching them using rd_kafka_message_detach_headers does not seem to work
rd_kafka_headers_t* headers_raw = nullptr;
const rd_kafka_resp_err_t get_header_response = rd_kafka_message_headers(&message, &headers_raw);
if (RD_KAFKA_RESP_ERR__NOENT == get_header_response) { return {}; }
if (RD_KAFKA_RESP_ERR_NO_ERROR != get_header_response) {
logger_->log_error("Failed to fetch message headers: {}: {}",
magic_enum::enum_underlying(rd_kafka_last_error()),
rd_kafka_err2str(rd_kafka_last_error()));
}
std::vector<std::string> matching_headers;
for (std::size_t header_idx = 0;; ++header_idx) {
const char* value = nullptr; // Not to be freed
std::size_t size = 0;
if (RD_KAFKA_RESP_ERR_NO_ERROR !=
rd_kafka_header_get(headers_raw, header_idx, header_name.c_str(), reinterpret_cast<const void**>(&value), &size)) {
break;
}
matching_headers.emplace_back(value, size);
}
return matching_headers;
}
std::vector<std::pair<std::string, std::string>> ConsumeKafka::getFlowFilesAttributesFromMessageHeaders(const rd_kafka_message_t& message) const {
gsl_Assert(headers_to_add_as_attributes_);
std::vector<std::pair<std::string, std::string>> attributes_from_headers;
for (const std::string& header_name: *headers_to_add_as_attributes_) {
const std::vector<std::string> matching_headers = get_matching_headers(message, header_name);
if (!matching_headers.empty()) {
attributes_from_headers.emplace_back(header_name,
utils::get_encoded_string(resolve_duplicate_headers(matching_headers), message_header_encoding_));
}
}
return attributes_from_headers;
}
void ConsumeKafka::addAttributesToSingleMessageFlowFile(core::FlowFile& flow_file, const rd_kafka_message_t& message) const {
flow_file.setAttribute(KafkaCountAttribute.name, "1");
if (const auto message_key = get_encoded_message_key(message, key_attribute_encoding_)) {
flow_file.setAttribute(KafkaKeyAttribute.name, *message_key);
}
flow_file.setAttribute(KafkaOffsetAttribute.name, std::to_string(message.offset));
flow_file.setAttribute(KafkaPartitionAttribute.name, std::to_string(message.partition));
flow_file.setAttribute(KafkaTopicAttribute.name, rd_kafka_topic_name(message.rkt));
if (headers_to_add_as_attributes_) {
for (const auto& [attr_key, attr_value]: getFlowFilesAttributesFromMessageHeaders(message)) { flow_file.setAttribute(attr_key, attr_value); }
}
}
void ConsumeKafka::addAttributesToMessageBundleFlowFile(core::FlowFile& flow_file, const MessageBundle& message_bundle) const {
gsl_Assert(!headers_to_add_as_attributes_);
flow_file.setAttribute(KafkaCountAttribute.name, std::to_string(message_bundle.getMessages().size()));
flow_file.setAttribute(KafkaOffsetAttribute.name, std::to_string(message_bundle.getLargestOffset()));
flow_file.setAttribute(KafkaPartitionAttribute.name, std::to_string(message_bundle.getMessages().front()->partition));
flow_file.setAttribute(KafkaTopicAttribute.name, rd_kafka_topic_name(message_bundle.getMessages().front()->rkt));
}
void ConsumeKafka::commitOffsetsFromMessages(const std::unordered_map<KafkaMessageLocation, MessageBundle>& message_bundles) const {
if (message_bundles.empty()) { return; }
const auto partitions = utils::rd_kafka_topic_partition_list_unique_ptr{rd_kafka_topic_partition_list_new(gsl::narrow<int>(message_bundles.size()))};
for (const auto& [location, message_bundle] : message_bundles) {
logger_->log_debug("Message bundle offsets: {} {} {}", location.topic, location.partition, message_bundle.getLargestOffset());
rd_kafka_topic_partition_list_add(partitions.get(), location.topic.data(), location.partition)->offset = message_bundle.getLargestOffset() + 1;
}
const auto commit_err_code = rd_kafka_commit(consumer_.get(), partitions.get(), 0);
if (RD_KAFKA_RESP_ERR_NO_ERROR != commit_err_code) {
logger_->log_error("Committing offset failed: {}: {}",
magic_enum::enum_underlying(commit_err_code), rd_kafka_err2str(commit_err_code));
}
}
void ConsumeKafka::commitOffsetsFromIncomingFlowFiles(core::ProcessSession& session) const {
std::vector<std::pair<std::shared_ptr<core::FlowFile>, core::Relationship>> flow_files;
while (auto ff = session.get()) { flow_files.push_back({ff, Committed}); }
if (flow_files.empty()) { return; }
std::unordered_map<KafkaMessageLocation, int64_t> max_offsets;
for (auto& [flow_file, relationship]: std::ranges::reverse_view(flow_files)) {
auto topic_name = flow_file->getAttribute(KafkaTopicAttribute.name)
| utils::toExpected(make_error_code(core::AttributeErrorCode::MissingAttribute));
const auto offset = flow_file->getAttribute(KafkaOffsetAttribute.name)
| utils::toExpected(make_error_code(core::AttributeErrorCode::MissingAttribute))
| utils::andThen(parsing::parseIntegral<int64_t>);
const auto partition = flow_file->getAttribute(KafkaPartitionAttribute.name)
| utils::toExpected(make_error_code(core::AttributeErrorCode::MissingAttribute))
| utils::andThen(parsing::parseIntegral<int32_t>);
if (!topic_name || !offset || !partition) {
logger_->log_error("Insufficient data for setting offsets. {} : {}, {} : {}, {} : {}",
KafkaTopicAttribute.name,
topic_name,
KafkaOffsetAttribute.name,
offset,
KafkaPartitionAttribute.name,
partition);
relationship = Failure;
}
int64_t& curr_offset = max_offsets[KafkaMessageLocation{std::move(topic_name.value()), *partition}];
curr_offset = std::max(curr_offset, *offset);
}
const auto partitions = utils::rd_kafka_topic_partition_list_unique_ptr{rd_kafka_topic_partition_list_new(gsl::narrow<int>(max_offsets.size()))};
for (const auto& [location, max_offset] : max_offsets) {
rd_kafka_topic_partition_list_add(partitions.get(), location.topic.data(), location.partition)->offset = max_offset + 1;
}
const auto commit_res = rd_kafka_commit(consumer_.get(), partitions.get(), 0);
switch (commit_res) {
case RD_KAFKA_RESP_ERR_NO_ERROR: {
logger_->log_debug("Commit successfully from {} flowfiles", flow_files.size());
break;
}
case RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE: {
logger_->log_info("{}, continuing", rd_kafka_err2str(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE));
break;
}
case RD_KAFKA_RESP_ERR__NO_OFFSET: {
logger_->log_info("{}, continuing", rd_kafka_err2str(RD_KAFKA_RESP_ERR__NO_OFFSET));
break;
}
default: {
logger_->log_error("Committing offset failed: {}: {}", magic_enum::enum_underlying(commit_res), rd_kafka_err2str(commit_res));
throw Exception(PROCESS_SESSION_EXCEPTION, fmt::format("Committing offset failed: {}: {}", magic_enum::enum_underlying(commit_res), rd_kafka_err2str(commit_res)));
}
}
for (const auto& [ff, relationship]: flow_files) { session.transfer(ff, relationship); }
}
void ConsumeKafka::processMessages(core::ProcessSession& session, const std::unordered_map<KafkaMessageLocation, MessageBundle>& message_bundles) const {
for (const auto& msg_bundle: message_bundles | std::views::values) {
for (const auto& message: msg_bundle.getMessages()) {
std::string message_content = extractMessage(*message);
auto flow_file = session.create();
session.writeBuffer(flow_file, message_content);
addAttributesToSingleMessageFlowFile(*flow_file, *message);
session.transfer(flow_file, Success);
}
}
session.commit();
if (commit_policy_ == consume_kafka::CommitPolicyEnum::CommitAfterBatch) { commitOffsetsFromMessages(message_bundles); }
}
void ConsumeKafka::processMessageBundles(core::ProcessSession& session,
const std::unordered_map<KafkaMessageLocation, MessageBundle>& message_bundles, const std::string_view message_demarcator) const {
for (const auto& msg_bundle: message_bundles | std::views::values) {
auto flow_file = session.create();
auto merged_message_content = utils::string::join(message_demarcator, msg_bundle.getMessages(), [](const auto& message) {
return extractMessage(*message);
});
session.writeBuffer(flow_file, merged_message_content);
addAttributesToMessageBundleFlowFile(*flow_file, msg_bundle);
session.transfer(flow_file, Success);
}
}
void ConsumeKafka::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
if (commit_policy_ == consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) { commitOffsetsFromIncomingFlowFiles(session); }
const auto message_bundles = pollKafkaMessages();
if (message_bundles.empty()) {
logger_->log_debug("No new messages");
return;
}
if (!message_demarcator_) {
processMessages(session, message_bundles);
} else {
processMessageBundles(session, message_bundles, *message_demarcator_);
}
}
REGISTER_RESOURCE(ConsumeKafka, Processor);
} // namespace org::apache::nifi::minifi::processors