blob: 731645ee920ade18b03a65b32221ff3b9db21c6e [file] [log] [blame]
/**
* @file PublishKafka.cpp
* PublishKafka class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PublishKafka.h"
#include <stdio.h>
#include <algorithm>
#include <memory>
#include <string>
#include <map>
#include <set>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property PublishKafka::SeedBrokers(
core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")->isRequired(true)->supportsExpressionLanguage(
true)->build());
core::Property PublishKafka::Topic(core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")->isRequired(true)->supportsExpressionLanguage(true)->build());
core::Property PublishKafka::DeliveryGuarantee(
core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("TSpecifies the requirement for guaranteeing that a message is sent to Kafka")->isRequired(false)
->supportsExpressionLanguage(true)->withDefaultValue("DELIVERY_ONE_NODE")->build());
core::Property PublishKafka::MaxMessageSize(core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")->isRequired(false)->build());
core::Property PublishKafka::RequestTimeOut(
core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request in milliseconds")->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PublishKafka::ClientName(
core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka")->isRequired(true)->supportsExpressionLanguage(true)->build());
/**
* These needn's have EL support.
*/
core::Property PublishKafka::BatchSize("Batch Size", "Maximum number of messages batched in one MessageSet", "");
core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", "");
core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay to wait for messages in the producer queue to accumulate before constructing message batches", "");
core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum total message size sum allowed on the producer queue", "");
core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the producer queue", "");
core::Property PublishKafka::CompressCodec("Compress Codec", "compression codec to use for compressing message sets", COMPRESSION_CODEC_NONE);
core::Property PublishKafka::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the kafka record", "");
core::Property PublishKafka::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", "");
core::Property PublishKafka::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", "");
core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", "");
core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "Path to client's private key (PEM) used for authentication", "");
core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "Private key passphrase", "");
core::Property PublishKafka::KerberosServiceName("Kerberos Service Name", "Kerberos Service Name", "");
core::Property PublishKafka::KerberosPrincipal("Kerberos Principal", "Keberos Principal", "");
core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path",
"The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.", "");
core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n"
"Supports Expression Language: true (will be evaluated using flow file attributes)",
"");
core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
void PublishKafka::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(SeedBrokers);
properties.insert(Topic);
properties.insert(DeliveryGuarantee);
properties.insert(MaxMessageSize);
properties.insert(RequestTimeOut);
properties.insert(ClientName);
properties.insert(AttributeNameRegex);
properties.insert(BatchSize);
properties.insert(QueueBufferMaxTime);
properties.insert(QueueBufferMaxSize);
properties.insert(QueueBufferMaxMessage);
properties.insert(CompressCodec);
properties.insert(MaxFlowSegSize);
properties.insert(SecurityProtocol);
properties.insert(SecurityCA);
properties.insert(SecurityCert);
properties.insert(SecurityPrivateKey);
properties.insert(SecurityPrivateKeyPassWord);
properties.insert(KerberosServiceName);
properties.insert(KerberosPrincipal);
properties.insert(KerberosKeytabPath);
properties.insert(MessageKeyField);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Failure);
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
}
bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &ff) {
std::string value;
int64_t valInt;
std::string valueConf;
char errstr[512];
rd_kafka_conf_res_t result;
auto conf_ = rd_kafka_conf_new();
//auto topic_conf_ = rd_kafka_topic_conf_new();
auto key = conn->getKey();
if (!key->brokers_.empty()) {
result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: bootstrap.servers [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
logger_->log_error("There are no brokers");
return false;
}
if (!key->client_id_.empty()) {
rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: client.id [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
logger_->log_error("Client id is empty");
return false;
}
// Kerberos configuration
if (context->getProperty(KerberosServiceName.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(KerberosPrincipal.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(KerberosKeytabPath.getName(), value) && !value.empty()) {
result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
valueConf = std::to_string(valInt);
result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: message.max.bytes [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) {
attributeNameRegex.assign(value);
logger_->log_debug("PublishKafka: AttributeNameRegex %s", value);
}
value = "";
if (context->getProperty(QueueBufferMaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
valInt = valInt / 1024;
valueConf = std::to_string(valInt);
rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
max_seg_size_ = ULLONG_MAX;
if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
max_seg_size_ = valInt;
logger_->log_debug("PublishKafka: max flow segment size [%d]", max_seg_size_);
}
value = "";
if (context->getProperty(QueueBufferMaxTime.getName(), value) && !value.empty()) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
valueConf = std::to_string(valInt);
rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr);
}
}
value = "";
if (context->getProperty(BatchSize.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: batch.num.messages [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr);
}
value = "";
if (context->getProperty(CompressCodec.getName(), value) && !value.empty() && value != "none") {
rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: compression.codec [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr);
}
value = "";
value = "";
if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
if (value == SECURITY_PROTOCOL_SSL) {
rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: security.protocol [%s]", value);
if (result != RD_KAFKA_CONF_OK) {
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
value = "";
if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: ssl.key.location [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
value = "";
if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) {
rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: ssl.key.password [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
}
}
}
}
// Add all of the dynamic properties as librdkafka configurations
const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
logger_->log_info("PublishKafka registering %d librdkafka dynamic properties", dynamic_prop_keys.size());
for (const auto &key : dynamic_prop_keys) {
value = "";
if (context->getDynamicProperty(key, value) && !value.empty()) {
logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", key, value);
rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr, sizeof(errstr));
} else {
logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", key);
}
}
auto producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr));
if (!producer) {
logger_->log_error("Failed to create Kafka producer %s", errstr);
return false;
}
conn->setConnection(producer, conf_);
return true;
}
void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
logger_->log_trace("Enter trigger");
std::shared_ptr<core::FlowFile> flowFile = session->get();
if (!flowFile) {
return;
}
std::string client_id, brokers, topic;
std::shared_ptr<KafkaConnection> conn = nullptr;
// get the client ID, brokers, and topic from either the flowfile, the configuration, or the properties
if (context->getProperty(ClientName, client_id, flowFile) && context->getProperty(SeedBrokers, brokers, flowFile) && context->getProperty(Topic, topic, flowFile)) {
KafkaConnectionKey key;
key.brokers_ = brokers;
key.client_id_ = client_id;
conn = connection_pool_.getOrCreateConnection(key);
if (!conn->initialized()) {
logger_->log_trace("Connection not initialized to %s, %s, %s", client_id, brokers, topic);
// get the ownership so we can configure this connection
KafkaLease lease = conn->obtainOwnership();
if (!configureNewConnection(conn, context, flowFile)) {
logger_->log_error("Could not configure Kafka Connection");
session->transfer(flowFile, Failure);
return;
}
// lease will go away
}
if (!conn->hasTopic(topic)) {
auto topic_conf_ = rd_kafka_topic_conf_new();
auto topic_reference = rd_kafka_topic_new(conn->getConnection(), topic.c_str(), topic_conf_);
rd_kafka_conf_res_t result;
std::string value;
char errstr[512];
int64_t valInt;
std::string valueConf;
if (context->getProperty(DeliveryGuarantee, value, flowFile) && !value.empty()) {
rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure delivery guarantee error result [%s]", errstr);
}
value = "";
if (context->getProperty(RequestTimeOut, value, flowFile) && !value.empty()) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
valueConf = std::to_string(valInt);
rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure timeout error result [%s]", errstr);
}
}
std::shared_ptr<KafkaTopic> kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference, topic_conf_);
conn->putTopic(topic, kafkaTopicref);
}
/*rd_kafka_conf_set(conf_, "client.id", client_id.c_str(), errstr, sizeof(errstr));
logger_->log_debug("PublishKafka: client.id [%s]", value);
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
*/
} else {
logger_->log_error("Do not have required properties");
session->transfer(flowFile, Failure);
return;
}
std::string kafkaKey;
kafkaKey = "";
if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) {
logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey);
} else {
kafkaKey = flowFile->getUUIDStr();
}
auto thisTopic = conn->getTopic(topic);
if (thisTopic) {
PublishKafka::ReadCallback callback(max_seg_size_, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile, attributeNameRegex);
session->read(flowFile, &callback);
if (callback.status_ < 0) {
logger_->log_error("Failed to send flow to kafka topic %s", topic);
session->transfer(flowFile, Failure);
} else {
logger_->log_debug("Sent flow with length %d to kafka topic %s", callback.read_size_, topic);
session->transfer(flowFile, Success);
}
} else {
logger_->log_error("Topic %s is invalid", topic);
session->transfer(flowFile, Failure);
}
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */