blob: 7a7f5574d3170cb395cc483bebffa38b13349b37 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012,2013 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "rd.h"
#include "rdkafka_int.h"
#include "rdkafka_msg.h"
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
#include "rdkafka_interceptor.h"
#include "rdcrc32.h"
#include "rdrand.h"
#include "rdtime.h"
#include "rdsysqueue.h"
#include <stdarg.h>
void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm) {
if (rkm->rkm_flags & RD_KAFKA_MSG_F_ACCOUNT) {
rd_dassert(rk || rkm->rkm_rkmessage.rkt);
rd_kafka_curr_msgs_sub(
rk ? rk :
rd_kafka_topic_a2i(rkm->rkm_rkmessage.rkt)->rkt_rk,
1, rkm->rkm_len);
}
if (likely(rkm->rkm_rkmessage.rkt != NULL))
rd_kafka_topic_destroy0(
rd_kafka_topic_a2s(rkm->rkm_rkmessage.rkt));
if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload)
rd_free(rkm->rkm_payload);
if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE_RKM)
rd_free(rkm);
}
/**
* @brief Create a new message, copying the payload as indicated by msgflags.
*
* @returns the new message
*/
static
rd_kafka_msg_t *rd_kafka_msg_new00 (rd_kafka_itopic_t *rkt,
int32_t partition,
int msgflags,
char *payload, size_t len,
const void *key, size_t keylen,
void *msg_opaque) {
rd_kafka_msg_t *rkm;
size_t mlen = sizeof(*rkm);
char *p;
/* If we are to make a copy of the payload, allocate space for it too */
if (msgflags & RD_KAFKA_MSG_F_COPY) {
msgflags &= ~RD_KAFKA_MSG_F_FREE;
mlen += len;
}
mlen += keylen;
/* Note: using rd_malloc here, not rd_calloc, so make sure all fields
* are properly set up. */
rkm = rd_malloc(mlen);
rkm->rkm_err = 0;
rkm->rkm_flags = RD_KAFKA_MSG_F_FREE_RKM | msgflags;
rkm->rkm_len = len;
rkm->rkm_opaque = msg_opaque;
rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep_a(rkt);
rkm->rkm_partition = partition;
rkm->rkm_offset = 0;
rkm->rkm_timestamp = 0;
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
p = (char *)(rkm+1);
if (payload && msgflags & RD_KAFKA_MSG_F_COPY) {
/* Copy payload to space following the ..msg_t */
rkm->rkm_payload = p;
memcpy(rkm->rkm_payload, payload, len);
p += len;
} else {
/* Just point to the provided payload. */
rkm->rkm_payload = payload;
}
if (key) {
rkm->rkm_key = p;
rkm->rkm_key_len = keylen;
memcpy(rkm->rkm_key, key, keylen);
} else {
rkm->rkm_key = NULL;
rkm->rkm_key_len = 0;
}
return rkm;
}
/**
* @brief Create a new message.
*
* @remark Must only be used by producer code.
*
* Returns 0 on success or -1 on error.
* Both errno and 'errp' are set appropriately.
*/
static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
int32_t force_partition,
int msgflags,
char *payload, size_t len,
const void *key, size_t keylen,
void *msg_opaque,
rd_kafka_resp_err_t *errp,
int *errnop,
int64_t timestamp,
rd_ts_t now) {
rd_kafka_msg_t *rkm;
if (unlikely(!payload))
len = 0;
if (!key)
keylen = 0;
if (unlikely(len + keylen >
(size_t)rkt->rkt_rk->rk_conf.max_msg_size ||
keylen > INT32_MAX)) {
*errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
if (errnop)
*errnop = EMSGSIZE;
return NULL;
}
*errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len,
msgflags & RD_KAFKA_MSG_F_BLOCK);
if (unlikely(*errp)) {
if (errnop)
*errnop = ENOBUFS;
return NULL;
}
rkm = rd_kafka_msg_new00(rkt, force_partition,
msgflags|RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */,
payload, len, key, keylen, msg_opaque);
if (timestamp)
rkm->rkm_timestamp = timestamp;
else
rkm->rkm_timestamp = rd_uclock()/1000;
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
rkm->rkm_ts_enq = now;
if (rkt->rkt_conf.message_timeout_ms == 0) {
rkm->rkm_ts_timeout = INT64_MAX;
} else {
rkm->rkm_ts_timeout = now +
rkt->rkt_conf.message_timeout_ms * 1000;
}
/* Call interceptor chain for on_send */
rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage);
return rkm;
}
/**
* @brief Produce: creates a new message, runs the partitioner and enqueues
* into on the selected partition.
*
* @returns 0 on success or -1 on error.
*
* If the function returns -1 and RD_KAFKA_MSG_F_FREE was specified, then
* the memory associated with the payload is still the caller's
* responsibility.
*
* @locks none
*/
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
int msgflags,
char *payload, size_t len,
const void *key, size_t keylen,
void *msg_opaque) {
rd_kafka_msg_t *rkm;
rd_kafka_resp_err_t err;
int errnox;
/* Create message */
rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags,
payload, len, key, keylen, msg_opaque,
&err, &errnox, 0, rd_clock());
if (unlikely(!rkm)) {
/* errno is already set by msg_new() */
rd_kafka_set_last_error(err, errnox);
return -1;
}
/* Partition the message */
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
if (likely(!err)) {
rd_kafka_set_last_error(0, 0);
return 0;
}
/* Interceptor: unroll failing messages by triggering on_ack.. */
rkm->rkm_err = err;
rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
&rkm->rkm_rkmessage);
/* Handle partitioner failures: it only fails when the application
* attempts to force a destination partition that does not exist
* in the cluster. Note we must clear the RD_KAFKA_MSG_F_FREE
* flag since our contract says we don't free the payload on
* failure. */
rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
/* Translate error codes to errnos. */
if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
rd_kafka_set_last_error(err, ESRCH);
else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
rd_kafka_set_last_error(err, ENOENT);
else
rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */
return -1;
}
rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...) {
va_list ap;
rd_kafka_msg_t s_rkm = {
/* Message defaults */
.rkm_partition = RD_KAFKA_PARTITION_UA,
.rkm_timestamp = 0, /* current time */
};
rd_kafka_msg_t *rkm = &s_rkm;
rd_kafka_vtype_t vtype;
rd_kafka_topic_t *app_rkt;
shptr_rd_kafka_itopic_t *s_rkt = NULL;
rd_kafka_itopic_t *rkt;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
va_start(ap, rk);
while ((vtype = va_arg(ap, rd_kafka_vtype_t)) != RD_KAFKA_VTYPE_END) {
switch (vtype)
{
case RD_KAFKA_VTYPE_TOPIC:
s_rkt = rd_kafka_topic_new0(rk,
va_arg(ap, const char *),
NULL, NULL, 1);
break;
case RD_KAFKA_VTYPE_RKT:
app_rkt = va_arg(ap, rd_kafka_topic_t *);
s_rkt = rd_kafka_topic_keep(
rd_kafka_topic_a2i(app_rkt));
break;
case RD_KAFKA_VTYPE_PARTITION:
rkm->rkm_partition = va_arg(ap, int32_t);
break;
case RD_KAFKA_VTYPE_VALUE:
rkm->rkm_payload = va_arg(ap, void *);
rkm->rkm_len = va_arg(ap, size_t);
break;
case RD_KAFKA_VTYPE_KEY:
rkm->rkm_key = va_arg(ap, void *);
rkm->rkm_key_len = va_arg(ap, size_t);
break;
case RD_KAFKA_VTYPE_OPAQUE:
rkm->rkm_opaque = va_arg(ap, void *);
break;
case RD_KAFKA_VTYPE_MSGFLAGS:
rkm->rkm_flags = va_arg(ap, int);
break;
case RD_KAFKA_VTYPE_TIMESTAMP:
rkm->rkm_timestamp = va_arg(ap, int64_t);
break;
default:
err = RD_KAFKA_RESP_ERR__INVALID_ARG;
break;
}
}
va_end(ap);
if (unlikely(!s_rkt))
return RD_KAFKA_RESP_ERR__INVALID_ARG;
rkt = rd_kafka_topic_s2i(s_rkt);
if (likely(!err))
rkm = rd_kafka_msg_new0(rkt,
rkm->rkm_partition,
rkm->rkm_flags,
rkm->rkm_payload, rkm->rkm_len,
rkm->rkm_key, rkm->rkm_key_len,
rkm->rkm_opaque,
&err, NULL,
rkm->rkm_timestamp, rd_clock());
if (unlikely(err))
return err;
/* Partition the message */
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
if (unlikely(err)) {
/* Handle partitioner failures: it only fails when
* the application attempts to force a destination
* partition that does not exist in the cluster. */
/* Interceptors: Unroll on_send by on_ack.. */
rkm->rkm_err = err;
rd_kafka_interceptors_on_acknowledgement(rk,
&rkm->rkm_rkmessage);
/* Note we must clear the RD_KAFKA_MSG_F_FREE
* flag since our contract says we don't free the payload on
* failure. */
rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
rd_kafka_msg_destroy(rk, rkm);
}
rd_kafka_topic_destroy0(s_rkt);
return err;
}
/**
* Produce a batch of messages.
* Returns the number of messages succesfully queued for producing.
* Each message's .err will be set accordingly.
*/
int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
int msgflags,
rd_kafka_message_t *rkmessages, int message_cnt) {
rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
int i;
int64_t utc_now = rd_uclock() / 1000;
rd_ts_t now = rd_clock();
int good = 0;
rd_kafka_resp_err_t all_err = 0;
rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
/* For partitioner; hold lock for entire run,
* for one partition: only acquire when needed at the end. */
if (partition == RD_KAFKA_PARTITION_UA)
rd_kafka_topic_rdlock(rkt);
for (i = 0 ; i < message_cnt ; i++) {
rd_kafka_msg_t *rkm;
/* Propagate error for all messages. */
if (unlikely(all_err)) {
rkmessages[i].err = all_err;
continue;
}
/* Create message */
rkm = rd_kafka_msg_new0(rkt,
partition , msgflags,
rkmessages[i].payload,
rkmessages[i].len,
rkmessages[i].key,
rkmessages[i].key_len,
rkmessages[i]._private,
&rkmessages[i].err,
NULL, utc_now, now);
if (unlikely(!rkm)) {
if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
all_err = rkmessages[i].err;
continue;
}
/* Two cases here:
* partition==UA: run the partitioner (slow)
* fixed partition: simply concatenate the queue to partit */
if (partition == RD_KAFKA_PARTITION_UA) {
/* Partition the message */
rkmessages[i].err =
rd_kafka_msg_partitioner(rkt, rkm,
0/*already locked*/);
if (unlikely(rkmessages[i].err)) {
/* Interceptors: Unroll on_send by on_ack.. */
rd_kafka_interceptors_on_acknowledgement(
rkt->rkt_rk, &rkmessages[i]);
rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
continue;
}
} else {
/* Single destination partition, enqueue message
* on temporary queue for later queue concat. */
rd_kafka_msgq_enq(&tmpq, rkm);
}
rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
good++;
}
/* Specific partition */
if (partition != RD_KAFKA_PARTITION_UA) {
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_topic_rdlock(rkt);
s_rktp = rd_kafka_toppar_get_avail(rkt, partition,
1/*ua on miss*/, &all_err);
/* Concatenate tmpq onto partition queue. */
if (likely(s_rktp != NULL)) {
rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
rd_atomic64_add(&rktp->rktp_c.msgs, good);
rd_kafka_toppar_concat_msgq(rktp, &tmpq);
rd_kafka_toppar_destroy(s_rktp);
}
}
rd_kafka_topic_rdunlock(rkt);
return good;
}
/**
* Scan 'rkmq' for messages that have timed out and remove them from
* 'rkmq' and add to 'timedout'.
*
* Returns the number of messages timed out.
*/
int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
rd_kafka_msgq_t *timedout,
rd_ts_t now) {
rd_kafka_msg_t *rkm, *tmp;
int cnt = rd_atomic32_get(&timedout->rkmq_msg_cnt);
/* Assume messages are added in time sequencial order */
TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
if (likely(rkm->rkm_ts_timeout > now))
break;
rd_kafka_msgq_deq(rkmq, rkm, 1);
rd_kafka_msgq_enq(timedout, rkm);
}
return rd_atomic32_get(&timedout->rkmq_msg_cnt) - cnt;
}
int32_t rd_kafka_msg_partitioner_random (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
int32_t p = rd_jitter(0, partition_cnt-1);
if (unlikely(!rd_kafka_topic_partition_available(rkt, p)))
return rd_jitter(0, partition_cnt-1);
else
return p;
}
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
return rd_crc32(key, keylen) % partition_cnt;
}
int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
if (keylen == 0)
return rd_kafka_msg_partitioner_random(rkt,
key,
keylen,
partition_cnt,
rkt_opaque,
msg_opaque);
else
return rd_kafka_msg_partitioner_consistent(rkt,
key,
keylen,
partition_cnt,
rkt_opaque,
msg_opaque);
}
/**
* Assigns a message to a topic partition using a partitioner.
* Returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or .._UNKNOWN_TOPIC if
* partitioning failed, or 0 on success.
*/
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
int do_lock) {
int32_t partition;
rd_kafka_toppar_t *rktp_new;
shptr_rd_kafka_toppar_t *s_rktp_new;
rd_kafka_resp_err_t err;
if (do_lock)
rd_kafka_topic_rdlock(rkt);
switch (rkt->rkt_state)
{
case RD_KAFKA_TOPIC_S_UNKNOWN:
/* No metadata received from cluster yet.
* Put message in UA partition and re-run partitioner when
* cluster comes up. */
partition = RD_KAFKA_PARTITION_UA;
break;
case RD_KAFKA_TOPIC_S_NOTEXISTS:
/* Topic not found in cluster.
* Fail message immediately. */
err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
if (do_lock)
rd_kafka_topic_rdunlock(rkt);
return err;
case RD_KAFKA_TOPIC_S_EXISTS:
/* Topic exists in cluster. */
/* Topic exists but has no partitions.
* This is usually an transient state following the
* auto-creation of a topic. */
if (unlikely(rkt->rkt_partition_cnt == 0)) {
partition = RD_KAFKA_PARTITION_UA;
break;
}
/* Partition not assigned, run partitioner. */
if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
rd_kafka_topic_t *app_rkt;
/* Provide a temporary app_rkt instance to protect
* from the case where the application decided to
* destroy its topic object prior to delivery completion
* (issue #502). */
app_rkt = rd_kafka_topic_keep_a(rkt);
partition = rkt->rkt_conf.
partitioner(app_rkt,
rkm->rkm_key,
rkm->rkm_key_len,
rkt->rkt_partition_cnt,
rkt->rkt_conf.opaque,
rkm->rkm_opaque);
rd_kafka_topic_destroy0(
rd_kafka_topic_a2s(app_rkt));
} else
partition = rkm->rkm_partition;
/* Check that partition exists. */
if (partition >= rkt->rkt_partition_cnt) {
err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
if (do_lock)
rd_kafka_topic_rdunlock(rkt);
return err;
}
break;
default:
rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
break;
}
/* Get new partition */
s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
if (unlikely(!s_rktp_new)) {
/* Unknown topic or partition */
if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
else
err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
if (do_lock)
rd_kafka_topic_rdunlock(rkt);
return err;
}
rktp_new = rd_kafka_toppar_s2i(s_rktp_new);
rd_atomic64_add(&rktp_new->rktp_c.msgs, 1);
/* Update message partition */
if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
rkm->rkm_partition = partition;
/* Partition is available: enqueue msg on partition's queue */
rd_kafka_toppar_enq_msg(rktp_new, rkm);
if (do_lock)
rd_kafka_topic_rdunlock(rkt);
rd_kafka_toppar_destroy(s_rktp_new); /* from _get() */
return 0;
}
/**
* @name Public message type (rd_kafka_message_t)
*/
void rd_kafka_message_destroy (rd_kafka_message_t *rkmessage) {
rd_kafka_op_t *rko;
if (likely((rko = (rd_kafka_op_t *)rkmessage->_private) != NULL))
rd_kafka_op_destroy(rko);
else {
rd_kafka_msg_t *rkm = rd_kafka_message2msg(rkmessage);
rd_kafka_msg_destroy(NULL, rkm);
}
}
rd_kafka_message_t *rd_kafka_message_new (void) {
rd_kafka_msg_t *rkm = rd_calloc(1, sizeof(*rkm));
return (rd_kafka_message_t *)rkm;
}
/**
* @brief Set up a rkmessage from an rko for passing to the application.
* @remark Will trigger on_consume() interceptors if any.
*/
static rd_kafka_message_t *
rd_kafka_message_setup (rd_kafka_op_t *rko, rd_kafka_message_t *rkmessage) {
rd_kafka_itopic_t *rkt;
rd_kafka_toppar_t *rktp = NULL;
if (rko->rko_type == RD_KAFKA_OP_DR) {
rkt = rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt);
} else {
if (rko->rko_rktp) {
rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
rkt = rktp->rktp_rkt;
} else
rkt = NULL;
rkmessage->_private = rko;
}
if (!rkmessage->rkt && rkt)
rkmessage->rkt = rd_kafka_topic_keep_a(rkt);
if (rktp)
rkmessage->partition = rktp->rktp_partition;
if (!rkmessage->err)
rkmessage->err = rko->rko_err;
/* Call on_consume interceptors */
switch (rko->rko_type)
{
case RD_KAFKA_OP_FETCH:
if (!rkmessage->err && rkt)
rd_kafka_interceptors_on_consume(rkt->rkt_rk,
rkmessage);
break;
default:
break;
}
return rkmessage;
}
/**
* @brief Get rkmessage from rkm (for EVENT_DR)
* @remark Must only be called just prior to passing a dr to the application.
*/
rd_kafka_message_t *rd_kafka_message_get_from_rkm (rd_kafka_op_t *rko,
rd_kafka_msg_t *rkm) {
return rd_kafka_message_setup(rko, &rkm->rkm_rkmessage);
}
/**
* @brief Convert rko to rkmessage
* @remark Must only be called just prior to passing a consumed message
* or event to the application.
* @remark Will trigger on_consume() interceptors, if any.
* @returns a rkmessage (bound to the rko).
*/
rd_kafka_message_t *rd_kafka_message_get (rd_kafka_op_t *rko) {
rd_kafka_message_t *rkmessage;
if (!rko)
return rd_kafka_message_new(); /* empty */
switch (rko->rko_type)
{
case RD_KAFKA_OP_FETCH:
/* Use embedded rkmessage */
rkmessage = &rko->rko_u.fetch.rkm.rkm_rkmessage;
break;
case RD_KAFKA_OP_ERR:
case RD_KAFKA_OP_CONSUMER_ERR:
rkmessage = &rko->rko_u.err.rkm.rkm_rkmessage;
rkmessage->payload = rko->rko_u.err.errstr;
rkmessage->offset = rko->rko_u.err.offset;
break;
default:
rd_kafka_assert(NULL, !*"unhandled optype");
RD_NOTREACHED();
return NULL;
}
return rd_kafka_message_setup(rko, rkmessage);
}
int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
rd_kafka_timestamp_type_t *tstype) {
rd_kafka_msg_t *rkm;
if (rkmessage->err) {
if (tstype)
*tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
return -1;
}
rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
if (tstype)
*tstype = rkm->rkm_tstype;
return rkm->rkm_timestamp;
}
int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage) {
rd_kafka_msg_t *rkm;
rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
if (unlikely(!rkm->rkm_ts_enq))
return -1;
return rd_clock() - rkm->rkm_ts_enq;
}