MINIFICPP-732 - Add property to expose librdkafka "debug" values for PublishKafka
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #614
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index a1887a2..75cb55c 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -79,6 +79,7 @@
core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a field in the Input Records that should be used as the Key for the Kafka message.\n"
"Supports Expression Language: true (will be evaluated using flow file attributes)",
"");
+core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable. Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
@@ -107,6 +108,7 @@
properties.insert(KerberosPrincipal);
properties.insert(KerberosKeytabPath);
properties.insert(MessageKeyField);
+ properties.insert(DebugContexts);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
@@ -129,9 +131,16 @@
auto key = conn->getKey();
+ if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) {
+ rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr, sizeof(errstr));
+ logger_->log_debug("PublishKafka: debug properties [%s]", value);
+ if (result != RD_KAFKA_CONF_OK)
+ logger_->log_error("PublishKafka: configure debug properties error result [%s]", errstr);
+ }
+
if (!key->brokers_.empty()) {
result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: bootstrap.servers [%s]", value);
+ logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_.c_str());
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
@@ -141,7 +150,7 @@
if (!key->client_id_.empty()) {
rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, sizeof(errstr));
- logger_->log_debug("PublishKafka: client.id [%s]", value);
+ logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_.c_str());
if (result != RD_KAFKA_CONF_OK)
logger_->log_error("PublishKafka: configure error result [%s]", errstr);
} else {
@@ -232,7 +241,6 @@
logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr);
}
value = "";
- value = "";
if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
if (value == SECURITY_PROTOCOL_SSL) {
rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr));
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index 8a23dee..8f6806f 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -338,6 +338,7 @@
static core::Property KerberosPrincipal;
static core::Property KerberosKeytabPath;
static core::Property MessageKeyField;
+ static core::Property DebugContexts;
// Supported Relationships
static core::Relationship Failure;