blob: a761e7aa10bd9f1e7ad0cc94f5c694d2764c38b8 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdarg.h>
#include "rdkafka_int.h"
#include "rdkafka_op.h"
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
#include "rdkafka_offset.h"
/* Current number of rd_kafka_op_t */
rd_atomic32_t rd_kafka_op_cnt;
const char *rd_kafka_op2str (rd_kafka_op_type_t type) {
int skiplen = 6;
static const char *names[] = {
[RD_KAFKA_OP_NONE] = "REPLY:NONE",
[RD_KAFKA_OP_FETCH] = "REPLY:FETCH",
[RD_KAFKA_OP_ERR] = "REPLY:ERR",
[RD_KAFKA_OP_CONSUMER_ERR] = "REPLY:CONSUMER_ERR",
[RD_KAFKA_OP_DR] = "REPLY:DR",
[RD_KAFKA_OP_STATS] = "REPLY:STATS",
[RD_KAFKA_OP_OFFSET_COMMIT] = "REPLY:OFFSET_COMMIT",
[RD_KAFKA_OP_NODE_UPDATE] = "REPLY:NODE_UPDATE",
[RD_KAFKA_OP_XMIT_BUF] = "REPLY:XMIT_BUF",
[RD_KAFKA_OP_RECV_BUF] = "REPLY:RECV_BUF",
[RD_KAFKA_OP_XMIT_RETRY] = "REPLY:XMIT_RETRY",
[RD_KAFKA_OP_FETCH_START] = "REPLY:FETCH_START",
[RD_KAFKA_OP_FETCH_STOP] = "REPLY:FETCH_STOP",
[RD_KAFKA_OP_SEEK] = "REPLY:SEEK",
[RD_KAFKA_OP_PAUSE] = "REPLY:PAUSE",
[RD_KAFKA_OP_OFFSET_FETCH] = "REPLY:OFFSET_FETCH",
[RD_KAFKA_OP_PARTITION_JOIN] = "REPLY:PARTITION_JOIN",
[RD_KAFKA_OP_PARTITION_LEAVE] = "REPLY:PARTITION_LEAVE",
[RD_KAFKA_OP_REBALANCE] = "REPLY:REBALANCE",
[RD_KAFKA_OP_TERMINATE] = "REPLY:TERMINATE",
[RD_KAFKA_OP_COORD_QUERY] = "REPLY:COORD_QUERY",
[RD_KAFKA_OP_SUBSCRIBE] = "REPLY:SUBSCRIBE",
[RD_KAFKA_OP_ASSIGN] = "REPLY:ASSIGN",
[RD_KAFKA_OP_GET_SUBSCRIPTION] = "REPLY:GET_SUBSCRIPTION",
[RD_KAFKA_OP_GET_ASSIGNMENT] = "REPLY:GET_ASSIGNMENT",
[RD_KAFKA_OP_THROTTLE] = "REPLY:THROTTLE",
[RD_KAFKA_OP_NAME] = "REPLY:NAME",
[RD_KAFKA_OP_OFFSET_RESET] = "REPLY:OFFSET_RESET",
[RD_KAFKA_OP_METADATA] = "REPLY:METADATA",
[RD_KAFKA_OP_LOG] = "REPLY:LOG",
[RD_KAFKA_OP_WAKEUP] = "REPLY:WAKEUP",
};
if (type & RD_KAFKA_OP_REPLY)
skiplen = 0;
return names[type & ~RD_KAFKA_OP_FLAGMASK]+skiplen;
}
void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko) {
fprintf(fp,
"%s((rd_kafka_op_t*)%p)\n"
"%s Type: %s (0x%x), Version: %"PRId32"\n",
prefix, rko,
prefix, rd_kafka_op2str(rko->rko_type), rko->rko_type,
rko->rko_version);
if (rko->rko_err)
fprintf(fp, "%s Error: %s\n",
prefix, rd_kafka_err2str(rko->rko_err));
if (rko->rko_replyq.q)
fprintf(fp, "%s Replyq %p v%d (%s)\n",
prefix, rko->rko_replyq.q, rko->rko_replyq.version,
#if ENABLE_DEVEL
rko->rko_replyq._id
#else
""
#endif
);
if (rko->rko_rktp) {
rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
fprintf(fp, "%s ((rd_kafka_toppar_t*)%p) "
"%s [%"PRId32"] v%d (shptr %p)\n",
prefix, rktp, rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
rd_atomic32_get(&rktp->rktp_version), rko->rko_rktp);
}
switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK)
{
case RD_KAFKA_OP_FETCH:
fprintf(fp, "%s Offset: %"PRId64"\n",
prefix, rko->rko_u.fetch.rkm.rkm_offset);
break;
case RD_KAFKA_OP_CONSUMER_ERR:
fprintf(fp, "%s Offset: %"PRId64"\n",
prefix, rko->rko_u.err.offset);
/* FALLTHRU */
case RD_KAFKA_OP_ERR:
fprintf(fp, "%s Reason: %s\n", prefix, rko->rko_u.err.errstr);
break;
case RD_KAFKA_OP_DR:
fprintf(fp, "%s %"PRId32" messages on %s\n", prefix,
rd_atomic32_get(&rko->rko_u.dr.msgq.rkmq_msg_cnt),
rko->rko_u.dr.s_rkt ?
rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt)->
rkt_topic->str : "(n/a)");
break;
case RD_KAFKA_OP_OFFSET_COMMIT:
fprintf(fp, "%s Callback: %p (opaque %p)\n",
prefix, rko->rko_u.offset_commit.cb,
rko->rko_u.offset_commit.opaque);
fprintf(fp, "%s %d partitions\n",
prefix,
rko->rko_u.offset_commit.partitions ?
rko->rko_u.offset_commit.partitions->cnt : 0);
break;
case RD_KAFKA_OP_LOG:
fprintf(fp, "%s Log: %%%d %s: %s\n",
prefix, rko->rko_u.log.level,
rko->rko_u.log.fac,
rko->rko_u.log.str);
break;
default:
break;
}
}
rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type) {
rd_kafka_op_t *rko;
static const size_t op2size[RD_KAFKA_OP__END] = {
[RD_KAFKA_OP_FETCH] = sizeof(rko->rko_u.fetch),
[RD_KAFKA_OP_ERR] = sizeof(rko->rko_u.err),
[RD_KAFKA_OP_CONSUMER_ERR] = sizeof(rko->rko_u.err),
[RD_KAFKA_OP_DR] = sizeof(rko->rko_u.dr),
[RD_KAFKA_OP_STATS] = sizeof(rko->rko_u.stats),
[RD_KAFKA_OP_OFFSET_COMMIT] = sizeof(rko->rko_u.offset_commit),
[RD_KAFKA_OP_NODE_UPDATE] = sizeof(rko->rko_u.node),
[RD_KAFKA_OP_XMIT_BUF] = sizeof(rko->rko_u.xbuf),
[RD_KAFKA_OP_RECV_BUF] = sizeof(rko->rko_u.xbuf),
[RD_KAFKA_OP_XMIT_RETRY] = sizeof(rko->rko_u.xbuf),
[RD_KAFKA_OP_FETCH_START] = sizeof(rko->rko_u.fetch_start),
[RD_KAFKA_OP_FETCH_STOP] = 0,
[RD_KAFKA_OP_SEEK] = sizeof(rko->rko_u.fetch_start),
[RD_KAFKA_OP_PAUSE] = sizeof(rko->rko_u.pause),
[RD_KAFKA_OP_OFFSET_FETCH] = sizeof(rko->rko_u.offset_fetch),
[RD_KAFKA_OP_PARTITION_JOIN] = 0,
[RD_KAFKA_OP_PARTITION_LEAVE] = 0,
[RD_KAFKA_OP_REBALANCE] = sizeof(rko->rko_u.rebalance),
[RD_KAFKA_OP_TERMINATE] = 0,
[RD_KAFKA_OP_COORD_QUERY] = 0,
[RD_KAFKA_OP_SUBSCRIBE] = sizeof(rko->rko_u.subscribe),
[RD_KAFKA_OP_ASSIGN] = sizeof(rko->rko_u.assign),
[RD_KAFKA_OP_GET_SUBSCRIPTION] = sizeof(rko->rko_u.subscribe),
[RD_KAFKA_OP_GET_ASSIGNMENT] = sizeof(rko->rko_u.assign),
[RD_KAFKA_OP_THROTTLE] = sizeof(rko->rko_u.throttle),
[RD_KAFKA_OP_NAME] = sizeof(rko->rko_u.name),
[RD_KAFKA_OP_OFFSET_RESET] = sizeof(rko->rko_u.offset_reset),
[RD_KAFKA_OP_METADATA] = sizeof(rko->rko_u.metadata),
[RD_KAFKA_OP_LOG] = sizeof(rko->rko_u.log),
[RD_KAFKA_OP_WAKEUP] = 0,
};
size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];
rko = rd_calloc(1, sizeof(*rko)-sizeof(rko->rko_u)+tsize);
rko->rko_type = type;
#if ENABLE_DEVEL
rko->rko_source = source;
rd_atomic32_add(&rd_kafka_op_cnt, 1);
#endif
return rko;
}
void rd_kafka_op_destroy (rd_kafka_op_t *rko) {
switch (rko->rko_type & ~RD_KAFKA_OP_FLAGMASK)
{
case RD_KAFKA_OP_FETCH:
rd_kafka_msg_destroy(NULL, &rko->rko_u.fetch.rkm);
/* Decrease refcount on rkbuf to eventually rd_free shared buf*/
if (rko->rko_u.fetch.rkbuf)
rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
break;
case RD_KAFKA_OP_OFFSET_FETCH:
if (rko->rko_u.offset_fetch.partitions &&
rko->rko_u.offset_fetch.do_free)
rd_kafka_topic_partition_list_destroy(
rko->rko_u.offset_fetch.partitions);
break;
case RD_KAFKA_OP_OFFSET_COMMIT:
RD_IF_FREE(rko->rko_u.offset_commit.partitions,
rd_kafka_topic_partition_list_destroy);
RD_IF_FREE(rko->rko_u.offset_commit.reason, rd_free);
break;
case RD_KAFKA_OP_SUBSCRIBE:
case RD_KAFKA_OP_GET_SUBSCRIPTION:
RD_IF_FREE(rko->rko_u.subscribe.topics,
rd_kafka_topic_partition_list_destroy);
break;
case RD_KAFKA_OP_ASSIGN:
case RD_KAFKA_OP_GET_ASSIGNMENT:
RD_IF_FREE(rko->rko_u.assign.partitions,
rd_kafka_topic_partition_list_destroy);
break;
case RD_KAFKA_OP_REBALANCE:
RD_IF_FREE(rko->rko_u.rebalance.partitions,
rd_kafka_topic_partition_list_destroy);
break;
case RD_KAFKA_OP_NAME:
RD_IF_FREE(rko->rko_u.name.str, rd_free);
break;
case RD_KAFKA_OP_ERR:
case RD_KAFKA_OP_CONSUMER_ERR:
RD_IF_FREE(rko->rko_u.err.errstr, rd_free);
rd_kafka_msg_destroy(NULL, &rko->rko_u.err.rkm);
break;
break;
case RD_KAFKA_OP_THROTTLE:
RD_IF_FREE(rko->rko_u.throttle.nodename, rd_free);
break;
case RD_KAFKA_OP_STATS:
RD_IF_FREE(rko->rko_u.stats.json, rd_free);
break;
case RD_KAFKA_OP_XMIT_RETRY:
case RD_KAFKA_OP_XMIT_BUF:
case RD_KAFKA_OP_RECV_BUF:
if (rko->rko_u.xbuf.rkbuf)
rd_kafka_buf_handle_op(rko, RD_KAFKA_RESP_ERR__DESTROY);
RD_IF_FREE(rko->rko_u.xbuf.rkbuf, rd_kafka_buf_destroy);
break;
case RD_KAFKA_OP_DR:
rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq);
if (rko->rko_u.dr.do_purge2)
rd_kafka_msgq_purge(rko->rko_rk, &rko->rko_u.dr.msgq2);
if (rko->rko_u.dr.s_rkt)
rd_kafka_topic_destroy0(rko->rko_u.dr.s_rkt);
break;
case RD_KAFKA_OP_OFFSET_RESET:
RD_IF_FREE(rko->rko_u.offset_reset.reason, rd_free);
break;
case RD_KAFKA_OP_METADATA:
RD_IF_FREE(rko->rko_u.metadata.md, rd_kafka_metadata_destroy);
break;
case RD_KAFKA_OP_LOG:
rd_free(rko->rko_u.log.str);
break;
default:
break;
}
if (rko->rko_type & RD_KAFKA_OP_CB && rko->rko_op_cb) {
rd_kafka_op_res_t res;
/* Let callback clean up */
rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY;
res = rko->rko_op_cb(rko->rko_rk, NULL, rko);
assert(res != RD_KAFKA_OP_RES_YIELD);
}
RD_IF_FREE(rko->rko_rktp, rd_kafka_toppar_destroy);
rd_kafka_replyq_destroy(&rko->rko_replyq);
#if ENABLE_DEVEL
if (rd_atomic32_sub(&rd_kafka_op_cnt, 1) < 0)
rd_kafka_assert(NULL, !*"rd_kafka_op_cnt < 0");
#endif
rd_free(rko);
}
/**
* Propagate an error event to the application on a specific queue.
* \p optype should be RD_KAFKA_OP_ERR for generic errors and
* RD_KAFKA_OP_CONSUMER_ERR for consumer errors.
*/
void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype,
rd_kafka_resp_err_t err, int32_t version,
rd_kafka_toppar_t *rktp, int64_t offset,
const char *fmt, ...) {
va_list ap;
char buf[2048];
rd_kafka_op_t *rko;
va_start(ap, fmt);
rd_vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
rko = rd_kafka_op_new(optype);
rko->rko_version = version;
rko->rko_err = err;
rko->rko_u.err.offset = offset;
rko->rko_u.err.errstr = rd_strdup(buf);
if (rktp)
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rd_kafka_q_enq(rkq, rko);
}
/**
* Creates a reply opp based on 'rko_orig'.
* If 'rko_orig' has rko_op_cb set the reply op will be OR:ed with
* RD_KAFKA_OP_CB, else the reply type will be the original rko_type OR:ed
* with RD_KAFKA_OP_REPLY.
*/
rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig,
rd_kafka_resp_err_t err) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(rko_orig->rko_type |
(rko_orig->rko_op_cb ?
RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY));
rd_kafka_op_get_reply_version(rko, rko_orig);
rko->rko_op_cb = rko_orig->rko_op_cb;
rko->rko_err = err;
if (rko_orig->rko_rktp)
rko->rko_rktp = rd_kafka_toppar_keep(
rd_kafka_toppar_s2i(rko_orig->rko_rktp));
return rko;
}
/**
* @brief Create new callback op for type \p type
*/
rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk,
rd_kafka_op_type_t type,
rd_kafka_op_cb_t *cb) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(type | RD_KAFKA_OP_CB);
rko->rko_op_cb = cb;
rko->rko_rk = rk;
return rko;
}
/**
* @brief Reply to 'rko' re-using the same rko.
* If there is no replyq the rko is destroyed.
*
* @returns 1 if op was enqueued, else 0 and rko is destroyed.
*/
int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
if (!rko->rko_replyq.q) {
rd_kafka_op_destroy(rko);
return 0;
}
rko->rko_type |= (rko->rko_op_cb ? RD_KAFKA_OP_CB : RD_KAFKA_OP_REPLY);
rko->rko_err = err;
return rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
}
/**
* @brief Send request to queue, wait for response.
*
* @returns response on success or NULL if destq is disabled.
*/
rd_kafka_op_t *rd_kafka_op_req0 (rd_kafka_q_t *destq,
rd_kafka_q_t *recvq,
rd_kafka_op_t *rko,
int timeout_ms) {
rd_kafka_op_t *reply;
/* Indicate to destination where to send reply. */
rd_kafka_op_set_replyq(rko, recvq, NULL);
/* Enqueue op */
if (!rd_kafka_q_enq(destq, rko))
return NULL;
/* Wait for reply */
reply = rd_kafka_q_pop(recvq, timeout_ms, 0);
/* May be NULL for timeout */
return reply;
}
/**
* Send request to queue, wait for response.
* Creates a temporary reply queue.
*/
rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq,
rd_kafka_op_t *rko,
int timeout_ms) {
rd_kafka_q_t *recvq;
rd_kafka_op_t *reply;
recvq = rd_kafka_q_new(destq->rkq_rk);
reply = rd_kafka_op_req0(destq, recvq, rko, timeout_ms);
rd_kafka_q_destroy(recvq);
return reply;
}
/**
* Send simple type-only request to queue, wait for response.
*/
rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type) {
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(type);
return rd_kafka_op_req(destq, rko, RD_POLL_INFINITE);
}
/**
* Destroys the rko and returns its error.
*/
rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
if (rko) {
err = rko->rko_err;
rd_kafka_op_destroy(rko);
}
return err;
}
/**
* Call op callback
*/
rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk, rd_kafka_q_t *rkq,
rd_kafka_op_t *rko) {
rd_kafka_op_res_t res;
res = rko->rko_op_cb(rk, rkq, rko);
if (unlikely(res == RD_KAFKA_OP_RES_YIELD || rd_kafka_yield_thread))
return RD_KAFKA_OP_RES_YIELD;
rko->rko_op_cb = NULL;
return res;
}
/**
* @brief Creates a new RD_KAFKA_OP_FETCH op and sets up the
* embedded message according to the parameters.
*
* @param rkmp will be set to the embedded rkm in the rko (for convenience)
* @param offset may be updated later if relative offset.
*/
rd_kafka_op_t *
rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp,
rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_buf_t *rkbuf,
int64_t offset,
size_t key_len, const void *key,
size_t val_len, const void *val) {
rd_kafka_msg_t *rkm;
rd_kafka_op_t *rko;
rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH);
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
rko->rko_version = version;
rkm = &rko->rko_u.fetch.rkm;
*rkmp = rkm;
/* Since all the ops share the same payload buffer
* a refcnt is used on the rkbuf that makes sure all
* consume_cb() will have been
* called for each of these ops before the rkbuf
* and its memory backing buffers are freed. */
rko->rko_u.fetch.rkbuf = rkbuf;
rd_kafka_buf_keep(rkbuf);
rkm->rkm_offset = offset;
rkm->rkm_key = (void *)key;
rkm->rkm_key_len = key_len;
rkm->rkm_payload = (void *)val;
rkm->rkm_len = val_len;
rko->rko_len = (int32_t)rkm->rkm_len;
rkm->rkm_partition = rktp->rktp_partition;
return rko;
}
/**
* Enqueue ERR__THROTTLE op, if desired.
*/
void rd_kafka_op_throttle_time (rd_kafka_broker_t *rkb,
rd_kafka_q_t *rkq,
int throttle_time) {
rd_kafka_op_t *rko;
rd_avg_add(&rkb->rkb_avg_throttle, throttle_time);
/* We send throttle events when:
* - throttle_time > 0
* - throttle_time == 0 and last throttle_time > 0
*/
if (!rkb->rkb_rk->rk_conf.throttle_cb ||
(!throttle_time && !rd_atomic32_get(&rkb->rkb_rk->rk_last_throttle)))
return;
rd_atomic32_set(&rkb->rkb_rk->rk_last_throttle, throttle_time);
rko = rd_kafka_op_new(RD_KAFKA_OP_THROTTLE);
rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
rko->rko_u.throttle.nodename = rd_strdup(rkb->rkb_nodename);
rko->rko_u.throttle.nodeid = rkb->rkb_nodeid;
rko->rko_u.throttle.throttle_time = throttle_time;
rd_kafka_q_enq(rkq, rko);
}
/**
* @brief Handle standard op types.
*/
rd_kafka_op_res_t
rd_kafka_op_handle_std (rd_kafka_t *rk, rd_kafka_q_t *rkq,
rd_kafka_op_t *rko, int cb_type) {
if (cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
return RD_KAFKA_OP_RES_PASS;
else if (cb_type != RD_KAFKA_Q_CB_EVENT &&
rko->rko_type & RD_KAFKA_OP_CB)
return rd_kafka_op_call(rk, rkq, rko);
else if (rko->rko_type == RD_KAFKA_OP_RECV_BUF) /* Handle Response */
rd_kafka_buf_handle_op(rko, rko->rko_err);
else if (rko->rko_type == RD_KAFKA_OP_WAKEUP)
;/* do nothing, wake up is a fact anyway */
else if (cb_type != RD_KAFKA_Q_CB_RETURN &&
rko->rko_type & RD_KAFKA_OP_REPLY &&
rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
return RD_KAFKA_OP_RES_HANDLED; /* dest queue was
* probably disabled. */
else
return RD_KAFKA_OP_RES_PASS;
return RD_KAFKA_OP_RES_HANDLED;
}
/**
* @brief Attempt to handle op using its queue's serve callback,
* or the passed callback, or op_handle_std(), else do nothing.
*
* @param rkq is \p rko's queue (which it was unlinked from) with rkq_lock
* being held. Callback may re-enqueue the op on this queue
* and return YIELD.
*
* @returns HANDLED if op was handled (and destroyed), PASS if not,
* or YIELD if op was handled (maybe destroyed or re-enqueued)
* and caller must propagate yield upwards (cancel and return).
*/
rd_kafka_op_res_t
rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
rd_kafka_q_cb_type_t cb_type, void *opaque,
rd_kafka_q_serve_cb_t *callback) {
rd_kafka_op_res_t res;
res = rd_kafka_op_handle_std(rk, rkq, rko, cb_type);
if (res == RD_KAFKA_OP_RES_HANDLED) {
rd_kafka_op_destroy(rko);
return res;
} else if (unlikely(res == RD_KAFKA_OP_RES_YIELD))
return res;
if (rko->rko_serve) {
callback = rko->rko_serve;
opaque = rko->rko_serve_opaque;
rko->rko_serve = NULL;
rko->rko_serve_opaque = NULL;
}
if (callback)
res = callback(rk, rkq, rko, cb_type, opaque);
return res;
}
/**
* @brief Store offset for fetched message.
*/
void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko,
const rd_kafka_message_t *rkmessage) {
rd_kafka_toppar_t *rktp;
if (unlikely(rko->rko_type != RD_KAFKA_OP_FETCH || rko->rko_err))
return;
rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
if (unlikely(!rk))
rk = rktp->rktp_rkt->rkt_rk;
rd_kafka_toppar_lock(rktp);
rktp->rktp_app_offset = rkmessage->offset+1;
if (rk->rk_conf.enable_auto_offset_store)
rd_kafka_offset_store0(rktp, rkmessage->offset+1, 0/*no lock*/);
rd_kafka_toppar_unlock(rktp);
}