blob: ae8f3395a55a47a75aadcc6ea369e836723fac47 [file] [log] [blame]
/**
* @file PublishKafka.cpp
* PublishKafka class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PublishKafka.h"
#include <cstdio>
#include <algorithm>
#include <memory>
#include <string>
#include <map>
#include <set>
#include <vector>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "utils/ScopeGuard.h"
#include "utils/GeneralUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
const core::Property PublishKafka::SeedBrokers(
core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
->isRequired(true)->supportsExpressionLanguage(true)->build());
const core::Property PublishKafka::Topic(
core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")
->isRequired(true)->supportsExpressionLanguage(true)->build());
const core::Property PublishKafka::DeliveryGuarantee(
core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka. "
"Valid values are 0 (do not wait for acks), "
"-1 or all (block until message is committed by all in sync replicas) "
"or any concrete number of nodes.")
->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
const core::Property PublishKafka::MaxMessageSize(
core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")
->isRequired(false)->build());
const core::Property PublishKafka::RequestTimeOut(
core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request")
->isRequired(false)->withDefaultValue<core::TimePeriodValue>("10 sec")->supportsExpressionLanguage(true)->build());
const core::Property PublishKafka::MessageTimeOut(
core::PropertyBuilder::createProperty("Message Timeout")->withDescription("The total time sending a message could take")
->isRequired(false)->withDefaultValue<core::TimePeriodValue>("30 sec")->supportsExpressionLanguage(true)->build());
const core::Property PublishKafka::ClientName(
core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka")
->isRequired(true)->supportsExpressionLanguage(true)->build());
/**
* These don't appear to need EL support
*/
const core::Property PublishKafka::BatchSize(
core::PropertyBuilder::createProperty("Batch Size")->withDescription("Maximum number of messages batched in one MessageSet")
->isRequired(false)->withDefaultValue<uint32_t>(10)->build());
const core::Property PublishKafka::TargetBatchPayloadSize(
core::PropertyBuilder::createProperty("Target Batch Payload Size")->withDescription("The target total payload size for a batch. 0 B means unlimited (Batch Size is still applied).")
->isRequired(false)->withDefaultValue<core::DataSizeValue>("512 KB")->build());
const core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", "");
const core::Property PublishKafka::QueueBufferMaxTime(
core::PropertyBuilder::createProperty("Queue Buffering Max Time")
->isRequired(false)
->withDefaultValue<core::TimePeriodValue>("10 sec")
->withDescription("Delay to wait for messages in the producer queue to accumulate before constructing message batches")
->build());
const core::Property PublishKafka::QueueBufferMaxSize(
core::PropertyBuilder::createProperty("Queue Max Buffer Size")
->isRequired(false)
->withDefaultValue<core::DataSizeValue>("1 MB")
->withDescription("Maximum total message size sum allowed on the producer queue")
->build());
const core::Property PublishKafka::QueueBufferMaxMessage(
core::PropertyBuilder::createProperty("Queue Max Message")
->isRequired(false)
->withDefaultValue<uint64_t>(1000)
->withDescription("Maximum number of messages allowed on the producer queue")
->build());
const core::Property PublishKafka::CompressCodec(
core::PropertyBuilder::createProperty("Compress Codec")
->isRequired(false)
->withDefaultValue<std::string>(COMPRESSION_CODEC_NONE)
->withAllowableValues<std::string>({COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY})
->withDescription("compression codec to use for compressing message sets")
->build());
const core::Property PublishKafka::MaxFlowSegSize(
core::PropertyBuilder::createProperty("Max Flow Segment Size")->withDescription("Maximum flow content payload segment size for the kafka record. 0 B means unlimited.")
->isRequired(false)->withDefaultValue<core::DataSizeValue>("0 B")->build());
const core::Property PublishKafka::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
const core::Property PublishKafka::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", "");
const core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
const core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
const core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
const core::Property PublishKafka::KerberosServiceName("Kerberos Service Name", "Kerberos Service Name", "");
const core::Property PublishKafka::KerberosPrincipal("Kerberos Principal", "Keberos Principal", "");
const core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path",
"The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.", "");
const core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n"
"Supports Expression Language: true (will be evaluated using flow file attributes)",
"");
const core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
"Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
const core::Property PublishKafka::FailEmptyFlowFiles(
core::PropertyBuilder::createProperty("Fail empty flow files")
->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is "
"deprecated. Use connections to drop empty flow files!")
->isRequired(false)
->withDefaultValue<bool>(true)
->build());
const core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
const core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
namespace {
struct rd_kafka_conf_deleter {
void operator()(rd_kafka_conf_t* p) const noexcept { rd_kafka_conf_destroy(p); }
};
struct rd_kafka_topic_conf_deleter {
void operator()(rd_kafka_topic_conf_t* p) const noexcept { rd_kafka_topic_conf_destroy(p); }
};
} // namespace
void PublishKafka::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(SeedBrokers);
properties.insert(Topic);
properties.insert(DeliveryGuarantee);
properties.insert(MaxMessageSize);
properties.insert(RequestTimeOut);
properties.insert(MessageTimeOut);
properties.insert(ClientName);
properties.insert(AttributeNameRegex);
properties.insert(BatchSize);
properties.insert(TargetBatchPayloadSize);
properties.insert(QueueBufferMaxTime);
properties.insert(QueueBufferMaxSize);
properties.insert(QueueBufferMaxMessage);
properties.insert(CompressCodec);
properties.insert(MaxFlowSegSize);
properties.insert(SecurityProtocol);
properties.insert(SecurityCA);
properties.insert(SecurityCert);
properties.insert(SecurityPrivateKey);
properties.insert(SecurityPrivateKeyPassWord);
properties.insert(KerberosServiceName);
properties.insert(KerberosPrincipal);
properties.insert(KerberosKeytabPath);
properties.insert(MessageKeyField);
properties.insert(DebugContexts);
properties.insert(FailEmptyFlowFiles);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Failure);
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
interrupted_ = false;
// Try to get a KafkaConnection
std::string client_id, brokers;
if (!context->getProperty(ClientName.getName(), client_id)) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client Name property missing or invalid");
}
if (!context->getProperty(SeedBrokers.getName(), brokers)) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Known Brokers property missing or invalid");
}
// Get some properties not (only) used directly to set up librdkafka
// Batch Size
context->getProperty(BatchSize.getName(), batch_size_);
logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size_);
// Target Batch Payload Size
context->getProperty(TargetBatchPayloadSize.getName(), target_batch_payload_size_);
logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size_);
// Max Flow Segment Size
context->getProperty(MaxFlowSegSize.getName(), max_flow_seg_size_);
logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size_);
// Attributes to Send as Headers
std::string value;
if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
attributeNameRegex_ = utils::Regex(value);
logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value);
}
key_.brokers_ = brokers;
key_.client_id_ = client_id;
conn_ = utils::make_unique<KafkaConnection>(key_);
configureNewConnection(context);
logger_->log_debug("Successfully configured PublishKafka");
}
void PublishKafka::notifyStop() {
logger_->log_debug("notifyStop called");
interrupted_ = true;
std::lock_guard<std::mutex> lock(messages_mutex_);
for (auto& messages : messages_set_) {
messages->interrupt();
}
conn_.reset();
}
/**
* Message delivery report callback using the richer rd_kafka_message_t object.
*/
void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) {
if (rkmessage->_private == nullptr) {
return;
}
// allocated in PublishKafka::ReadCallback::produce
auto* func = reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private);
try {
(*func)(rk, rkmessage);
} catch (...) { }
delete func;
}
bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessContext> &context) {
std::string value;
int64_t valInt;
std::string valueConf;
std::array<char, 512U> errstr{};
rd_kafka_conf_res_t result;
const std::string PREFIX_ERROR_MSG = "PublishKafka: configure error result: ";
std::unique_ptr<rd_kafka_conf_t, rd_kafka_conf_deleter> conf_{ rd_kafka_conf_new() };
if (conf_ == nullptr) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object");
}
auto key = conn_->getKey();
if (key->brokers_.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
}
result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
if (key->client_id_.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty");
}
result = rd_kafka_conf_set(conf_.get(), "client.id", key->client_id_.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
value = "";
if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "debug", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: debug [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(KerberosServiceName.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.service.name", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(KerberosPrincipal.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.principal", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(KerberosKeytabPath.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.keytab", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: message.max.bytes [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(QueueBufferMaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
valInt = valInt / 1024;
valueConf = std::to_string(valInt);
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(QueueBufferMaxTime.getName(), value) && !value.empty()) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
valueConf = std::to_string(valInt);
result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
}
value = "";
if (context->getProperty(BatchSize.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: batch.num.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(CompressCodec.getName(), value) && !value.empty() && value != "none") {
result = rd_kafka_conf_set(conf_.get(), "compression.codec", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: compression.codec [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
if (value == SECURITY_PROTOCOL_SSL) {
result = rd_kafka_conf_set(conf_.get(), "security.protocol", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: security.protocol [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
value = "";
if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "ssl.ca.location", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "ssl.certificate.location", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "ssl.key.location", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: ssl.key.location [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
value = "";
if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_.get(), "ssl.key.password", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: ssl.key.password [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
} else {
auto error_msg = utils::StringUtils::join_pack("PublishKafka: unknown Security Protocol: ", value);
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
}
// Add all of the dynamic properties as librdkafka configurations
const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
logger_->log_info("PublishKafka registering %d librdkafka dynamic properties", dynamic_prop_keys.size());
for (const auto &prop_key : dynamic_prop_keys) {
value = "";
if (context->getDynamicProperty(prop_key, value) && !value.empty()) {
logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", prop_key, value);
result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(), value.c_str(), errstr.data(), errstr.size());
if (result != RD_KAFKA_CONF_OK) {
auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
} else {
logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", prop_key);
}
}
// Set the delivery callback
rd_kafka_conf_set_dr_msg_cb(conf_.get(), &PublishKafka::messageDeliveryCallback);
// Set the logger callback
rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback);
// The producer takes ownership of the configuration, we must not free it
gsl::owner<rd_kafka_t*> producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_.release(), errstr.data(), errstr.size());
if (producer == nullptr) {
auto error_msg = utils::StringUtils::join_pack("Failed to create Kafka producer ", errstr.data());
throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
}
conn_->setConnection(producer);
return true;
}
bool PublishKafka::createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name) {
std::unique_ptr<rd_kafka_topic_conf_t, rd_kafka_topic_conf_deleter> topic_conf_{ rd_kafka_topic_conf_new() };
if (topic_conf_ == nullptr) {
logger_->log_error("Failed to create rd_kafka_topic_conf_t object");
return false;
}
rd_kafka_conf_res_t result;
std::string value;
std::array<char, 512U> errstr{};
int64_t valInt;
std::string valueConf;
value = "";
if (context->getProperty(DeliveryGuarantee.getName(), value) && !value.empty()) {
/*
* Because of a previous error in this processor, the default value of this property was "DELIVERY_ONE_NODE".
* As this is not a valid value for "request.required.acks", the following rd_kafka_topic_conf_set call failed,
* but because of an another error, this failure was silently ignored, meaning that the the default value for
* "request.required.acks" did not change, and thus remained "-1". This means that having "DELIVERY_ONE_NODE" as
* the value of this property actually caused the processor to wait for delivery ACKs from ALL nodes, instead
* of just one. In order not to break configurations generated with earlier versions and keep the same behaviour
* as they had, we have to map "DELIVERY_ONE_NODE" to "-1" here.
*/
if (value == "DELIVERY_ONE_NODE") {
value = "-1";
logger_->log_warn("Using DELIVERY_ONE_NODE as the Delivery Guarantee property is deprecated and is translated to -1 "
"(block until message is committed by all in sync replicas) for backwards compatibility. "
"If you want to wait for one acknowledgment use '1' as the property.");
}
result = rd_kafka_topic_conf_set(topic_conf_.get(), "request.required.acks", value.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
logger_->log_error("PublishKafka: configure request.required.acks error result [%s]", errstr.data());
return false;
}
}
value = "";
if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty()) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) &&
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
valueConf = std::to_string(valInt);
result = rd_kafka_topic_conf_set(topic_conf_.get(), "request.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
logger_->log_error("PublishKafka: configure request.timeout.ms error result [%s]", errstr.data());
return false;
}
}
}
value = "";
if (context->getProperty(MessageTimeOut.getName(), value) && !value.empty()) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) &&
core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
valueConf = std::to_string(valInt);
result = rd_kafka_topic_conf_set(topic_conf_.get(), "message.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size());
logger_->log_debug("PublishKafka: message.timeout.ms [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK) {
logger_->log_error("PublishKafka: configure message.timeout.ms error result [%s]", errstr.data());
return false;
}
}
}
// The topic takes ownership of the configuration, we must not free it
gsl::owner<rd_kafka_topic_t*> topic_reference = rd_kafka_topic_new(conn_->getConnection(), topic_name.c_str(), topic_conf_.release());
if (topic_reference == nullptr) {
rd_kafka_resp_err_t resp_err = rd_kafka_last_error();
logger_->log_error("PublishKafka: failed to create topic %s, error: %s", topic_name.c_str(), rd_kafka_err2str(resp_err));
return false;
}
const auto kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference); // KafkaTopic takes ownership of topic_reference
conn_->putTopic(topic_name, kafkaTopicref);
return true;
}
void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
// Check whether we have been interrupted
if (interrupted_) {
logger_->log_info("The processor has been interrupted, not running onTrigger");
context->yield();
return;
}
std::lock_guard<std::mutex> lock_connection(connection_mutex_);
logger_->log_debug("PublishKafka onTrigger");
// Collect FlowFiles to process
uint64_t actual_bytes = 0U;
std::vector<std::shared_ptr<core::FlowFile>> flowFiles;
for (uint32_t i = 0; i < batch_size_; i++) {
std::shared_ptr<core::FlowFile> flowFile = session->get();
if (flowFile == nullptr) {
break;
}
actual_bytes += flowFile->getSize();
flowFiles.emplace_back(std::move(flowFile));
if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) {
break;
}
}
if (flowFiles.empty()) {
context->yield();
return;
}
logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes);
auto messages = std::make_shared<Messages>();
// We must add this to the messages set, so that it will be interrupted when notifyStop is called
{
std::lock_guard<std::mutex> lock(messages_mutex_);
messages_set_.emplace(messages);
}
// We also have to insure that it will be removed once we are done with it
const auto messagesSetGuard = gsl::finally([&]() {
std::lock_guard<std::mutex> lock(messages_mutex_);
messages_set_.erase(messages);
});
// Process FlowFiles
for (auto& flowFile : flowFiles) {
size_t flow_file_index = messages->addFlowFile();
// Get Topic (FlowFile-dependent EL property)
std::string topic;
if (!context->getProperty(Topic, topic, flowFile)) {
logger_->log_error("Flow file %s does not have a valid Topic", flowFile->getUUIDStr());
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
// Add topic to the connection if needed
if (!conn_->hasTopic(topic)) {
if (!createNewTopic(context, topic)) {
logger_->log_error("Failed to add topic %s", topic);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
}
std::string kafkaKey;
kafkaKey = "";
if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) {
logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey);
} else {
kafkaKey = flowFile->getUUIDStr();
}
auto thisTopic = conn_->getTopic(topic);
if (thisTopic == nullptr) {
logger_->log_error("Topic %s is invalid", topic);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
bool failEmptyFlowFiles = true;
context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles);
PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles);
session->read(flowFile, &callback);
if (!callback.called_) {
// workaround: call callback since ProcessSession doesn't do so for empty flow files without resource claims
callback.process(nullptr);
}
if (flowFile->getSize() == 0 && failEmptyFlowFiles) {
logger_->log_debug("Deprecated behavior, use connections to drop empty flow files! Failing empty flow file with uuid: %s", flowFile->getUUIDStr());
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
}
if (callback.status_ < 0) {
logger_->log_error("Failed to send flow to kafka topic %s, error: %s", topic, callback.error_);
messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) {
flow_file_result.flow_file_error = true;
});
continue;
}
}
logger_->log_trace("PublishKafka::onTrigger waitForCompletion start");
messages->waitForCompletion();
if (messages->wasInterrupted()) {
logger_->log_warn("Waiting for delivery confirmation was interrupted, some flow files might be routed to Failure, even if they were successfully delivered.");
}
logger_->log_trace("PublishKafka::onTrigger waitForCompletion finish");
messages->iterateFlowFiles([&](size_t index, const FlowFileResult& flow_file) {
bool success;
if (flow_file.flow_file_error) {
success = false;
} else {
success = true;
for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) {
const auto& message = flow_file.messages[segment_num];
switch (message.status) {
case MessageStatus::MESSAGESTATUS_UNCOMPLETE:
success = false;
logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu",
flowFiles[index]->getUUIDStr(),
segment_num);
break;
case MessageStatus::MESSAGESTATUS_ERROR:
success = false;
logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s",
flowFiles[index]->getUUIDStr(),
segment_num,
rd_kafka_err2str(message.err_code));
break;
case MessageStatus::MESSAGESTATUS_SUCCESS:
logger_->log_debug("Successfully delivered flow file %s segment %zu",
flowFiles[index]->getUUIDStr(),
segment_num);
break;
}
}
}
if (success) {
session->transfer(flowFiles[index], Success);
} else {
session->transfer(flowFiles[index], Failure);
}
});
}
} // namespace processors
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org