blob: e880573208d7373b439052c9bfac9f1e554c9d11 [file] [log] [blame]
/*
* librdkafka - Apache Kafka C/C++ library
*
* Copyright (c) 2014 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <iostream>
#include <string>
#include <list>
#include <cerrno>
#include "rdkafkacpp_int.h"
RdKafka::Producer::~Producer () {
}
static void dr_msg_cb_trampoline (rd_kafka_t *rk,
const rd_kafka_message_t *
rkmessage,
void *opaque) {
RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
RdKafka::MessageImpl message(NULL, rkmessage);
handle->dr_cb_->dr_cb(message);
}
RdKafka::Producer *RdKafka::Producer::create (RdKafka::Conf *conf,
std::string &errstr) {
char errbuf[512];
RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl();
rd_kafka_conf_t *rk_conf = NULL;
if (confimpl) {
if (!confimpl->rk_conf_) {
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
delete rkp;
return NULL;
}
rkp->set_common_config(confimpl);
rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
if (confimpl->dr_cb_) {
rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline);
rkp->dr_cb_ = confimpl->dr_cb_;
}
}
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf,
errbuf, sizeof(errbuf)))) {
errstr = errbuf;
delete rkp;
return NULL;
}
rkp->rk_ = rk;
return rkp;
}
RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
int32_t partition,
int msgflags,
void *payload, size_t len,
const std::string *key,
void *msg_opaque) {
RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
payload, len,
key ? key->c_str() : NULL, key ? key->size() : 0,
msg_opaque) == -1)
return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
return RdKafka::ERR_NO_ERROR;
}
RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
int32_t partition,
int msgflags,
void *payload, size_t len,
const void *key,
size_t key_len,
void *msg_opaque) {
RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
payload, len, key, key_len,
msg_opaque) == -1)
return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
return RdKafka::ERR_NO_ERROR;
}
RdKafka::ErrorCode
RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
int32_t partition,
const std::vector<char> *payload,
const std::vector<char> *key,
void *msg_opaque) {
RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
if (rd_kafka_produce(topicimpl->rkt_, partition, RD_KAFKA_MSG_F_COPY,
payload ? (void *)&(*payload)[0] : NULL,
payload ? payload->size() : 0,
key ? &(*key)[0] : NULL, key ? key->size() : 0,
msg_opaque) == -1)
return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());
return RdKafka::ERR_NO_ERROR;
}
RdKafka::ErrorCode
RdKafka::ProducerImpl::produce (const std::string topic_name,
int32_t partition, int msgflags,
void *payload, size_t len,
const void *key, size_t key_len,
int64_t timestamp,
void *msg_opaque) {
return
static_cast<RdKafka::ErrorCode>
(
rd_kafka_producev(rk_,
RD_KAFKA_V_TOPIC(topic_name.c_str()),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_MSGFLAGS(msgflags),
RD_KAFKA_V_VALUE(payload, len),
RD_KAFKA_V_KEY(key, key_len),
RD_KAFKA_V_TIMESTAMP(timestamp),
RD_KAFKA_V_OPAQUE(msg_opaque),
RD_KAFKA_V_END)
);
}