| /* |
| * librdkafka - Apache Kafka C library |
| * |
| * Copyright (c) 2016 Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include "rdkafka_int.h" |
| #include "rdkafka_event.h" |
| #include "rd.h" |
| |
| rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev) { |
| return rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE; |
| } |
| |
| const char *rd_kafka_event_name (const rd_kafka_event_t *rkev) { |
| switch (rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE) |
| { |
| case RD_KAFKA_EVENT_NONE: |
| return "(NONE)"; |
| case RD_KAFKA_EVENT_DR: |
| return "DeliveryReport"; |
| case RD_KAFKA_EVENT_FETCH: |
| return "Fetch"; |
| case RD_KAFKA_EVENT_LOG: |
| return "Log"; |
| case RD_KAFKA_EVENT_ERROR: |
| return "Error"; |
| case RD_KAFKA_EVENT_REBALANCE: |
| return "Rebalance"; |
| case RD_KAFKA_EVENT_OFFSET_COMMIT: |
| return "OffsetCommit"; |
| case RD_KAFKA_EVENT_STATS: |
| return "Stats"; |
| default: |
| return "?unknown?"; |
| } |
| } |
| |
| |
| |
| |
| void rd_kafka_event_destroy (rd_kafka_event_t *rkev) { |
| if (unlikely(!rkev)) |
| return; |
| rd_kafka_op_destroy(rkev); |
| } |
| |
| |
| /** |
| * @returns the next message from the event's message queue. |
| * @remark messages will be freed automatically when event is destroyed, |
| * application MUST NOT call rd_kafka_message_destroy() |
| */ |
| const rd_kafka_message_t * |
| rd_kafka_event_message_next (rd_kafka_event_t *rkev) { |
| rd_kafka_op_t *rko = rkev; |
| rd_kafka_msg_t *rkm; |
| rd_kafka_msgq_t *rkmq, *rkmq2; |
| rd_kafka_message_t *rkmessage; |
| |
| switch (rkev->rko_type) |
| { |
| case RD_KAFKA_OP_DR: |
| rkmq = &rko->rko_u.dr.msgq; |
| rkmq2 = &rko->rko_u.dr.msgq2; |
| break; |
| |
| case RD_KAFKA_OP_FETCH: |
| /* Just one message */ |
| if (rko->rko_u.fetch.evidx++ > 0) |
| return NULL; |
| |
| rkmessage = rd_kafka_message_get(rko); |
| if (unlikely(!rkmessage)) |
| return NULL; |
| |
| /* Store offset */ |
| rd_kafka_op_offset_store(NULL, rko, rkmessage); |
| |
| return rkmessage; |
| |
| |
| default: |
| return NULL; |
| } |
| |
| if (unlikely(!(rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)))) |
| return NULL; |
| |
| rd_kafka_msgq_deq(rkmq, rkm, 1); |
| |
| /* Put rkm on secondary message queue which will be purged later. */ |
| rd_kafka_msgq_enq(rkmq2, rkm); |
| |
| return rd_kafka_message_get_from_rkm(rko, rkm); |
| } |
| |
| |
| size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev, |
| const rd_kafka_message_t **rkmessages, size_t size) { |
| size_t cnt = 0; |
| const rd_kafka_message_t *rkmessage; |
| |
| while ((rkmessage = rd_kafka_event_message_next(rkev))) |
| rkmessages[cnt++] = rkmessage; |
| |
| return cnt; |
| } |
| |
| |
| size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev) { |
| switch (rkev->rko_evtype) |
| { |
| case RD_KAFKA_EVENT_DR: |
| return rd_atomic32_get(&rkev->rko_u.dr.msgq.rkmq_msg_cnt); |
| case RD_KAFKA_EVENT_FETCH: |
| return 1; |
| default: |
| return 0; |
| } |
| } |
| |
| |
| rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev) { |
| return rkev->rko_err; |
| } |
| |
| const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev) { |
| switch (rkev->rko_type) |
| { |
| case RD_KAFKA_OP_ERR: |
| case RD_KAFKA_OP_CONSUMER_ERR: |
| if (rkev->rko_u.err.errstr) |
| return rkev->rko_u.err.errstr; |
| /* FALLTHRU */ |
| default: |
| return rd_kafka_err2str(rkev->rko_err); |
| } |
| } |
| |
| |
| void *rd_kafka_event_opaque (rd_kafka_event_t *rkev) { |
| switch (rkev->rko_type & ~RD_KAFKA_OP_FLAGMASK) |
| { |
| case RD_KAFKA_OP_OFFSET_COMMIT: |
| return rkev->rko_u.offset_commit.opaque; |
| default: |
| return NULL; |
| } |
| } |
| |
| |
| int rd_kafka_event_log (rd_kafka_event_t *rkev, const char **fac, |
| const char **str, int *level) { |
| if (unlikely(rkev->rko_evtype != RD_KAFKA_EVENT_LOG)) |
| return -1; |
| |
| if (likely(fac != NULL)) |
| *fac = rkev->rko_u.log.fac; |
| if (likely(str != NULL)) |
| *str = rkev->rko_u.log.str; |
| if (likely(level != NULL)) |
| *level = rkev->rko_u.log.level; |
| |
| return 0; |
| } |
| |
| const char *rd_kafka_event_stats (rd_kafka_event_t *rkev) { |
| return rkev->rko_u.stats.json; |
| } |
| |
| rd_kafka_topic_partition_list_t * |
| rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev) { |
| switch (rkev->rko_evtype) |
| { |
| case RD_KAFKA_EVENT_REBALANCE: |
| return rkev->rko_u.rebalance.partitions; |
| case RD_KAFKA_EVENT_OFFSET_COMMIT: |
| return rkev->rko_u.offset_commit.partitions; |
| default: |
| return NULL; |
| } |
| } |
| |
| |
| rd_kafka_topic_partition_t * |
| rd_kafka_event_topic_partition (rd_kafka_event_t *rkev) { |
| rd_kafka_topic_partition_t *rktpar; |
| |
| if (unlikely(!rkev->rko_rktp)) |
| return NULL; |
| |
| rktpar = rd_kafka_topic_partition_new_from_rktp( |
| rd_kafka_toppar_s2i(rkev->rko_rktp)); |
| |
| switch (rkev->rko_type) |
| { |
| case RD_KAFKA_OP_ERR: |
| case RD_KAFKA_OP_CONSUMER_ERR: |
| rktpar->offset = rkev->rko_u.err.offset; |
| break; |
| default: |
| break; |
| } |
| |
| rktpar->err = rkev->rko_err; |
| |
| return rktpar; |
| |
| } |