| /* |
| * librdkafka - Apache Kafka C/C++ library |
| * |
| * Copyright (c) 2014 Magnus Edenhill |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * |
| * 1. Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * 2. Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| #include <iostream> |
| #include <string> |
| #include <list> |
| |
| #include "rdkafkacpp_int.h" |
| |
| void RdKafka::consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) { |
| RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); |
| RdKafka::Topic* topic = static_cast<Topic *>(rd_kafka_topic_opaque(msg->rkt)); |
| |
| RdKafka::MessageImpl message(topic, msg, false /*don't free*/); |
| |
| handle->consume_cb_->consume_cb(message, opaque); |
| } |
| |
| void RdKafka::log_cb_trampoline (const rd_kafka_t *rk, int level, |
| const char *fac, const char *buf) { |
| if (!rk) { |
| rd_kafka_log_print(rk, level, fac, buf); |
| return; |
| } |
| |
| void *opaque = rd_kafka_opaque(rk); |
| RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); |
| |
| if (!handle->event_cb_) { |
| rd_kafka_log_print(rk, level, fac, buf); |
| return; |
| } |
| |
| RdKafka::EventImpl event(RdKafka::Event::EVENT_LOG, |
| RdKafka::ERR_NO_ERROR, |
| static_cast<RdKafka::Event::Severity>(level), |
| fac, buf); |
| |
| handle->event_cb_->event_cb(event); |
| } |
| |
| |
| void RdKafka::error_cb_trampoline (rd_kafka_t *rk, int err, |
| const char *reason, void *opaque) { |
| RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); |
| |
| RdKafka::EventImpl event(RdKafka::Event::EVENT_ERROR, |
| static_cast<RdKafka::ErrorCode>(err), |
| RdKafka::Event::EVENT_SEVERITY_ERROR, |
| NULL, |
| reason); |
| |
| handle->event_cb_->event_cb(event); |
| } |
| |
| |
| void RdKafka::throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name, |
| int32_t broker_id, |
| int throttle_time_ms, |
| void *opaque) { |
| RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); |
| |
| RdKafka::EventImpl event(RdKafka::Event::EVENT_THROTTLE); |
| event.str_ = broker_name; |
| event.id_ = broker_id; |
| event.throttle_time_ = throttle_time_ms; |
| |
| handle->event_cb_->event_cb(event); |
| } |
| |
| |
| int RdKafka::stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len, |
| void *opaque) { |
| RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); |
| |
| RdKafka::EventImpl event(RdKafka::Event::EVENT_STATS, |
| RdKafka::ERR_NO_ERROR, |
| RdKafka::Event::EVENT_SEVERITY_INFO, |
| NULL, json); |
| |
| handle->event_cb_->event_cb(event); |
| |
| return 0; |
| } |
| |
| |
| int RdKafka::socket_cb_trampoline (int domain, int type, int protocol, |
| void *opaque) { |
| RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); |
| |
| return handle->socket_cb_->socket_cb(domain, type, protocol); |
| } |
| |
| int RdKafka::open_cb_trampoline (const char *pathname, int flags, mode_t mode, |
| void *opaque) { |
| RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); |
| |
| return handle->open_cb_->open_cb(pathname, flags, static_cast<int>(mode)); |
| } |
| |
| RdKafka::ErrorCode RdKafka::HandleImpl::metadata (bool all_topics, |
| const Topic *only_rkt, |
| Metadata **metadatap, |
| int timeout_ms) { |
| |
| const rd_kafka_metadata_t *cmetadatap=NULL; |
| |
| rd_kafka_topic_t *topic = only_rkt ? |
| static_cast<const TopicImpl *>(only_rkt)->rkt_ : NULL; |
| |
| const rd_kafka_resp_err_t rc = rd_kafka_metadata(rk_, all_topics, topic, |
| &cmetadatap,timeout_ms); |
| |
| *metadatap = (rc == RD_KAFKA_RESP_ERR_NO_ERROR) ? |
| new RdKafka::MetadataImpl(cmetadatap) : NULL; |
| |
| return static_cast<RdKafka::ErrorCode>(rc); |
| } |
| |
| /** |
| * Convert a list of C partitions to C++ partitions |
| */ |
| static void c_parts_to_partitions (const rd_kafka_topic_partition_list_t |
| *c_parts, |
| std::vector<RdKafka::TopicPartition*> |
| &partitions) { |
| partitions.resize(c_parts->cnt); |
| for (int i = 0 ; i < c_parts->cnt ; i++) |
| partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]); |
| } |
| |
| static void free_partition_vector (std::vector<RdKafka::TopicPartition*> &v) { |
| for (unsigned int i = 0 ; i < v.size() ; i++) |
| delete v[i]; |
| v.clear(); |
| } |
| |
| void |
| RdKafka::rebalance_cb_trampoline (rd_kafka_t *rk, |
| rd_kafka_resp_err_t err, |
| rd_kafka_topic_partition_list_t *c_partitions, |
| void *opaque) { |
| RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); |
| std::vector<RdKafka::TopicPartition*> partitions; |
| |
| c_parts_to_partitions(c_partitions, partitions); |
| |
| handle->rebalance_cb_->rebalance_cb( |
| dynamic_cast<RdKafka::KafkaConsumer*>(handle), |
| static_cast<RdKafka::ErrorCode>(err), |
| partitions); |
| |
| free_partition_vector(partitions); |
| } |
| |
| |
| void |
| RdKafka::offset_commit_cb_trampoline0 ( |
| rd_kafka_t *rk, |
| rd_kafka_resp_err_t err, |
| rd_kafka_topic_partition_list_t *c_offsets, void *opaque) { |
| OffsetCommitCb *cb = static_cast<RdKafka::OffsetCommitCb *>(opaque); |
| std::vector<RdKafka::TopicPartition*> offsets; |
| |
| if (c_offsets) |
| c_parts_to_partitions(c_offsets, offsets); |
| |
| cb->offset_commit_cb(static_cast<RdKafka::ErrorCode>(err), offsets); |
| |
| free_partition_vector(offsets); |
| } |
| |
| static void |
| offset_commit_cb_trampoline ( |
| rd_kafka_t *rk, |
| rd_kafka_resp_err_t err, |
| rd_kafka_topic_partition_list_t *c_offsets, void *opaque) { |
| RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque); |
| RdKafka::offset_commit_cb_trampoline0(rk, err, c_offsets, |
| handle->offset_commit_cb_); |
| } |
| |
| |
| void RdKafka::HandleImpl::set_common_config (RdKafka::ConfImpl *confimpl) { |
| |
| rd_kafka_conf_set_opaque(confimpl->rk_conf_, this); |
| |
| if (confimpl->event_cb_) { |
| rd_kafka_conf_set_log_cb(confimpl->rk_conf_, |
| RdKafka::log_cb_trampoline); |
| rd_kafka_conf_set_error_cb(confimpl->rk_conf_, |
| RdKafka::error_cb_trampoline); |
| rd_kafka_conf_set_throttle_cb(confimpl->rk_conf_, |
| RdKafka::throttle_cb_trampoline); |
| rd_kafka_conf_set_stats_cb(confimpl->rk_conf_, |
| RdKafka::stats_cb_trampoline); |
| event_cb_ = confimpl->event_cb_; |
| } |
| |
| if (confimpl->socket_cb_) { |
| rd_kafka_conf_set_socket_cb(confimpl->rk_conf_, |
| RdKafka::socket_cb_trampoline); |
| socket_cb_ = confimpl->socket_cb_; |
| } |
| |
| if (confimpl->open_cb_) { |
| #ifndef _MSC_VER |
| rd_kafka_conf_set_open_cb(confimpl->rk_conf_, RdKafka::open_cb_trampoline); |
| open_cb_ = confimpl->open_cb_; |
| #endif |
| } |
| |
| if (confimpl->rebalance_cb_) { |
| rd_kafka_conf_set_rebalance_cb(confimpl->rk_conf_, |
| RdKafka::rebalance_cb_trampoline); |
| rebalance_cb_ = confimpl->rebalance_cb_; |
| } |
| |
| if (confimpl->offset_commit_cb_) { |
| rd_kafka_conf_set_offset_commit_cb(confimpl->rk_conf_, |
| offset_commit_cb_trampoline); |
| offset_commit_cb_ = confimpl->offset_commit_cb_; |
| } |
| |
| if (confimpl->consume_cb_) { |
| rd_kafka_conf_set_consume_cb(confimpl->rk_conf_, |
| RdKafka::consume_cb_trampoline); |
| consume_cb_ = confimpl->consume_cb_; |
| } |
| |
| } |
| |
| |
| RdKafka::ErrorCode |
| RdKafka::HandleImpl::pause (std::vector<RdKafka::TopicPartition*> &partitions) { |
| rd_kafka_topic_partition_list_t *c_parts; |
| rd_kafka_resp_err_t err; |
| |
| c_parts = partitions_to_c_parts(partitions); |
| |
| err = rd_kafka_pause_partitions(rk_, c_parts); |
| |
| if (!err) |
| update_partitions_from_c_parts(partitions, c_parts); |
| |
| rd_kafka_topic_partition_list_destroy(c_parts); |
| |
| return static_cast<RdKafka::ErrorCode>(err); |
| } |
| |
| |
| RdKafka::ErrorCode |
| RdKafka::HandleImpl::resume (std::vector<RdKafka::TopicPartition*> &partitions) { |
| rd_kafka_topic_partition_list_t *c_parts; |
| rd_kafka_resp_err_t err; |
| |
| c_parts = partitions_to_c_parts(partitions); |
| |
| err = rd_kafka_resume_partitions(rk_, c_parts); |
| |
| if (!err) |
| update_partitions_from_c_parts(partitions, c_parts); |
| |
| rd_kafka_topic_partition_list_destroy(c_parts); |
| |
| return static_cast<RdKafka::ErrorCode>(err); |
| } |
| |
| RdKafka::Queue * |
| RdKafka::HandleImpl::get_partition_queue (const TopicPartition *part) { |
| rd_kafka_queue_t *rkqu; |
| rkqu = rd_kafka_queue_get_partition(rk_, |
| part->topic().c_str(), |
| part->partition()); |
| |
| if (rkqu == NULL) |
| return NULL; |
| |
| RdKafka::QueueImpl *queueimpl = new RdKafka::QueueImpl; |
| queueimpl->queue_ = rkqu; |
| |
| return queueimpl; |
| } |
| |
| RdKafka::ErrorCode |
| RdKafka::HandleImpl::set_log_queue (RdKafka::Queue *queue) { |
| rd_kafka_queue_t *rkqu = NULL; |
| if (queue) { |
| QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue); |
| rkqu = queueimpl->queue_; |
| } |
| return static_cast<RdKafka::ErrorCode>( |
| rd_kafka_set_log_queue(rk_, rkqu)); |
| } |
| |
| namespace RdKafka { |
| |
| rd_kafka_topic_partition_list_t * |
| partitions_to_c_parts (const std::vector<RdKafka::TopicPartition*> &partitions){ |
| rd_kafka_topic_partition_list_t *c_parts; |
| |
| c_parts = rd_kafka_topic_partition_list_new((int)partitions.size()); |
| |
| for (unsigned int i = 0 ; i < partitions.size() ; i++) { |
| const RdKafka::TopicPartitionImpl *tpi = |
| dynamic_cast<const RdKafka::TopicPartitionImpl*>(partitions[i]); |
| rd_kafka_topic_partition_t *rktpar = |
| rd_kafka_topic_partition_list_add(c_parts, |
| tpi->topic_.c_str(), tpi->partition_); |
| rktpar->offset = tpi->offset_; |
| } |
| |
| return c_parts; |
| } |
| |
| |
| /** |
| * @brief Update the application provided 'partitions' with info from 'c_parts' |
| */ |
| void |
| update_partitions_from_c_parts (std::vector<RdKafka::TopicPartition*> &partitions, |
| const rd_kafka_topic_partition_list_t *c_parts) { |
| for (int i = 0 ; i < c_parts->cnt ; i++) { |
| rd_kafka_topic_partition_t *p = &c_parts->elems[i]; |
| |
| /* Find corresponding C++ entry */ |
| for (unsigned int j = 0 ; j < partitions.size() ; j++) { |
| RdKafka::TopicPartitionImpl *pp = |
| dynamic_cast<RdKafka::TopicPartitionImpl*>(partitions[j]); |
| if (!strcmp(p->topic, pp->topic_.c_str()) && |
| p->partition == pp->partition_) { |
| pp->offset_ = p->offset; |
| pp->err_ = static_cast<RdKafka::ErrorCode>(p->err); |
| } |
| } |
| } |
| } |
| |
| }; |
| |