blob: b844f3f293c86f1222f66b9d9e4e2cfda0ee183b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "kafka.h"
#define POLL_TIMEOUT_MS 1000
/*
* Passed to all callback functions to help identify the connection.
*/
struct opaque {
int conn_id;
};
/*
* Data structures required for the kafka client
*/
static rd_kafka_t **kaf_h;
static rd_kafka_topic_t **kaf_top_h;
static unsigned num_conns;
static FILE *stats_fd;
static struct app_stats *kaf_conn_stats;
static struct opaque *kaf_opaque;
static uint64_t *kaf_keys;
/*
* A callback executed when an error occurs within the kafka client
*/
static void kaf_error_cb (rd_kafka_t *rk, int err, const char *reason, void* UNUSED(opaque))
{
LOG_ERROR(USER1, "kafka client unexpected error; conn=%s, error=%s [%s] \n",
rd_kafka_name(rk), rd_kafka_err2str(err), reason);
}
/*
* A callback executed when a broker throttles the producer
*/
static void kaf_throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void* UNUSED(opaque))
{
LOG_ERROR(USER1, "kafka client throttle event; conn=%s, time=%dms broker=%s broker_id=%"PRId32" \n",
rd_kafka_name(rk), throttle_time_ms, broker_name, broker_id);
}
/*
* A callback executed on a fixed frequency (defined by `statistics.interval.ms`)
* that provides detailed performance statistics
*/
static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t UNUSED(json_len), void *opaque)
{
int rc;
struct opaque *data = (struct opaque*) opaque;
int conn_id = data->conn_id;
// update queue depth of this kafka connection
kaf_conn_stats[conn_id].depth = rd_kafka_outq_len(rk);
// write json to the stats file
if(NULL != stats_fd) {
rc = fprintf(stats_fd, "{ \"conn_id\": \"%u\", \"conn_name\": \"%s\", \"stats\": %s }\n", conn_id, rd_kafka_name(rk), json);
if(rc < 0) {
LOG_ERROR(USER1, "Unable to append to stats file \n");
return rc;
}
fflush(stats_fd);
}
// 0 ensures the json pointer is immediately freed
return 0;
}
/*
* A callback that is called once for each message when it has been successfully
* produced.
*/
static void kaf_message_delivered_cb (rd_kafka_t *UNUSED(rk), const rd_kafka_message_t *UNUSED(rkmessage), void *opaque)
{
struct opaque *data = (struct opaque*) opaque;
int conn_id = data->conn_id;
kaf_conn_stats[conn_id].out += 1;
}
/*
* Opens the file used to persist the stats coming out of the kafka client
*/
static int open_stats_file(char *filename)
{
int rc;
stats_fd = fopen(filename, "a");
if(NULL == stats_fd) {
LOG_ERROR(USER1, "Unable to open stats file: file=%s, error=%s \n", filename, strerror(errno));
return -1;
}
// mark the file
rc = fprintf(stats_fd, "{} \n");
if(rc < 0) {
LOG_ERROR(USER1, "Unable to append to stats file \n");
return rc;
}
fflush(stats_fd);
return 0;
}
/*
* Closes the file used to persist the kafka client stats.
*/
static void close_stats_file(void)
{
if(NULL != stats_fd) {
fclose(stats_fd);
}
}
/**
* A callback executed for each global Kafka option.
*/
static void kaf_global_option(const char* key, const char* val, void* arg)
{
rd_kafka_conf_t* conf = (rd_kafka_conf_t*)arg;
rd_kafka_conf_res_t rc;
char err[512];
rc = rd_kafka_conf_set(conf, key, val, err, sizeof(err));
if (RD_KAFKA_CONF_OK != rc) {
LOG_WARN(USER1, "unable to set kafka global option: '%s' = '%s': %s\n", key, val, err);
}
}
/**
* A callback executed for topic-level Kafka option.
*/
static void kaf_topic_option(const char* key, const char* val, void* arg)
{
rd_kafka_topic_conf_t* conf = (rd_kafka_topic_conf_t*)arg;
rd_kafka_conf_res_t rc;
char err[512];
rc = rd_kafka_topic_conf_set(conf, key, val, err, sizeof(err));
if (RD_KAFKA_CONF_OK != rc) {
LOG_WARN(USER1, "unable to set kafka topic option: '%s' = '%s': %s\n", key, val, err);
}
}
/**
* Parses the configuration values from a configuration file.
*/
static void parse_kafka_config(char* file_path, const char* group,
void (*option_cb)(const char* key, const char* val, void* arg), void* arg)
{
gsize i;
gchar* value;
gchar** keys;
gsize num_keys;
GError* err = NULL;
GError** errs = NULL;
// load the configuration file
GKeyFile* gkf = g_key_file_new();
if (!g_key_file_load_from_file(gkf, file_path, G_KEY_FILE_NONE, &err)) {
LOG_ERROR(USER1, "bad config: %s: %s\n", file_path, err->message);
}
// only grab keys within the specified group
keys = g_key_file_get_keys(gkf, group, &num_keys, errs);
if (keys) {
// execute the callback for each key/value
for (i = 0; i < num_keys; i++) {
value = g_key_file_get_value(gkf, group, keys[i], errs);
if (value) {
LOG_DEBUG(USER1, "config[%s]: %s = %s\n", group, keys[i], value);
option_cb(keys[i], value, arg);
}
else {
LOG_INFO(USER1, "bad config: %s: %s = %s: %s\n", file_path, keys[i], value, errs[0]->message);
}
}
}
else {
LOG_WARN(USER1, "bad config: %s: %s\n", file_path, errs[0]->message);
}
g_strfreev(keys);
g_key_file_free(gkf);
}
/**
* Initializes a pool of Kafka connections.
*/
void kaf_init(int num_of_conns)
{
int i;
char errstr[512];
// open the file to which the kafka stats are appended
if(NULL != app.kafka_stats_path) {
LOG_INFO(USER1, "Appending Kafka client stats to '%s' \n", app.kafka_stats_path);
open_stats_file(app.kafka_stats_path);
}
// the number of connections to maintain
num_conns = num_of_conns;
// create kafka resources for each consumer
kaf_h = calloc(num_of_conns, sizeof(rd_kafka_t*));
kaf_top_h = calloc(num_of_conns, sizeof(rd_kafka_topic_t*));
kaf_conn_stats = calloc(num_of_conns, sizeof(struct app_stats));
kaf_opaque = calloc(num_of_conns, sizeof(struct opaque));
kaf_keys = calloc(num_of_conns, sizeof(uint64_t));
for (i = 0; i < num_of_conns; i++) {
// passed to each callback function to identify the kafka connection
kaf_opaque[i] = (struct opaque) { .conn_id = i };
rd_kafka_conf_t* kaf_conf = rd_kafka_conf_new();
rd_kafka_conf_set_opaque(kaf_conf, (void *) &kaf_opaque[i]);
rd_kafka_conf_set_error_cb(kaf_conf, kaf_error_cb);
rd_kafka_conf_set_throttle_cb(kaf_conf, kaf_throttle_cb);
rd_kafka_conf_set_stats_cb(kaf_conf, kaf_stats_cb);
rd_kafka_conf_set_dr_msg_cb(kaf_conf, kaf_message_delivered_cb);
// configure kafka connection; values parsed from kafka config file
if (NULL != app.kafka_config_path) {
parse_kafka_config(app.kafka_config_path, "kafka-global", kaf_global_option, (void*)kaf_conf);
}
// create a new kafka connection
kaf_h[i] = rd_kafka_new(RD_KAFKA_PRODUCER, kaf_conf, errstr, sizeof(errstr));
if (!kaf_h[i]) {
rte_exit(EXIT_FAILURE, "Cannot init kafka connection: %s", errstr);
}
// configure kafka topic; values parsed from kafka config file
rd_kafka_topic_conf_t* topic_conf = rd_kafka_topic_conf_new();
if (NULL != app.kafka_config_path) {
parse_kafka_config(app.kafka_config_path, "kafka-topic", kaf_topic_option, (void*)topic_conf);
}
// connect to a kafka topic
kaf_top_h[i] = rd_kafka_topic_new(kaf_h[i], app.kafka_topic, topic_conf);
if (!kaf_top_h[i]) {
rte_exit(EXIT_FAILURE, "Cannot init kafka topic: %s", app.kafka_topic);
}
}
}
/*
* Executes polling across all of the kafka client connections. Ensures that any queued
* callbacks are served.
*/
void kaf_poll(void)
{
unsigned i;
for (i = 0; i < num_conns; i++) {
rd_kafka_poll(kaf_h[i], POLL_TIMEOUT_MS);
}
}
/**
* Retrieves a summary of statistics across all of the kafka client connections.
*/
int kaf_stats(struct app_stats *stats)
{
unsigned i;
uint64_t in, out, depth, drops;
in = out = depth = drops = 0;
for (i = 0; i < num_conns; i++) {
in += kaf_conn_stats[i].in;
out += kaf_conn_stats[i].out;
depth += kaf_conn_stats[i].depth;
drops += kaf_conn_stats[i].drops;
}
stats->in = in;
stats->out = out;
stats->depth = depth;
stats->drops = drops;
return 0;
}
/**
* Closes the pool of Kafka connections.
*/
void kaf_close(void)
{
unsigned i;
LOG_INFO(USER1, "Closing all Kafka connections \n");
for (i = 0; i < num_conns; i++) {
LOG_INFO(USER1, "'%u' message(s) queued on %s \n", rd_kafka_outq_len(kaf_h[i]), rd_kafka_name(kaf_h[i]));
}
for (i = 0; i < num_conns; i++) {
// wait for messages to be delivered
while (rd_kafka_outq_len(kaf_h[i]) > 0) {
LOG_INFO(USER1, "Waiting for '%u' message(s) on %s \n", rd_kafka_outq_len(kaf_h[i]), rd_kafka_name(kaf_h[i]));
rd_kafka_poll(kaf_h[i], POLL_TIMEOUT_MS);
}
LOG_INFO(USER1, "All messages cleared on %s \n", rd_kafka_name(kaf_h[i]));
rd_kafka_flush(kaf_h[i], POLL_TIMEOUT_MS);
rd_kafka_topic_destroy(kaf_top_h[i]);
rd_kafka_destroy(kaf_h[i]);
}
free(kaf_conn_stats);
free(kaf_opaque);
free(kaf_keys);
close_stats_file();
}
/**
* The current time in microseconds.
*/
static uint64_t current_time(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec * (uint64_t)1000000 + tv.tv_usec;
}
/**
* Publish a set of packets to a kafka topic.
*/
int kaf_send(struct rte_mbuf* pkts[], int pkt_count, int conn_id)
{
// unassigned partition
int partition = RD_KAFKA_PARTITION_UA;
int i;
int pkts_sent = 0;
rd_kafka_message_t kaf_msgs[pkt_count];
// find the topic connection based on the conn_id
rd_kafka_topic_t* kaf_topic = kaf_top_h[conn_id];
// current time in epoch microseconds from (big-endian aka network byte order)
// is added as a message key before being sent to kafka
kaf_keys[conn_id] = htobe64(current_time());
// create the batch message for kafka
for (i = 0; i < pkt_count; i++) {
kaf_msgs[i].err = 0;
kaf_msgs[i].rkt = kaf_topic;
kaf_msgs[i].partition = partition;
kaf_msgs[i].payload = rte_ctrlmbuf_data(pkts[i]);
kaf_msgs[i].len = rte_ctrlmbuf_len(pkts[i]);
kaf_msgs[i].key = (void*) &kaf_keys[conn_id];
kaf_msgs[i].key_len = sizeof(uint64_t);
kaf_msgs[i].offset = 0;
}
// hand all of the messages off to kafka
pkts_sent = rd_kafka_produce_batch(kaf_topic, partition, 0, kaf_msgs, pkt_count);
// update stats
kaf_conn_stats[conn_id].in += pkt_count;
kaf_conn_stats[conn_id].drops += (pkt_count - pkts_sent);
return pkts_sent;
}