blob: f05d0035853e293bfc34da04c7cdebb707012662 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <pulsar/Consumer.h>
#include <pulsar/MessageBuilder.h>
#include "ConsumerImpl.h"
#include "Utils.h"
namespace pulsar {
const std::string EMPTY_STRING;
BrokerConsumerStats::BrokerConsumerStats():validTill_(now()) {};
BrokerConsumerStats::BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut,
double msgRateRedeliver, std::string consumerName, int availablePermits,
int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address,
std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog):
validTill_(validTill),
msgRateOut_(msgRateOut),
msgThroughputOut_(msgThroughputOut),
msgRateRedeliver_(msgRateRedeliver),
consumerName_(consumerName),
availablePermits_(availablePermits),
unackedMessages_(unackedMessages),
blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs),
address_(address),
connectedSince_(connectedSince),
type_(type),
msgRateExpired_(msgRateExpired),
msgBacklog_(msgBacklog)
{}
bool BrokerConsumerStats::isValid() const {
return now() <= validTill_;
}
std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj) {
os << "\nBrokerConsumerStats ["
<< "validTill_ = " << obj.validTill_
<< ", msgRateOut_ = " << obj.msgRateOut_
<< ", msgThroughputOut_ = " << obj.msgThroughputOut_
<< ", msgRateRedeliver_ = " << obj.msgRateRedeliver_
<< ", consumerName_ = " << obj.consumerName_
<< ", availablePermits_ = " << obj.availablePermits_
<< ", unackedMessages_ = " << obj.unackedMessages_
<< ", blockedConsumerOnUnackedMsgs_ = " << obj.blockedConsumerOnUnackedMsgs_
<< ", address_ = " << obj.address_
<< ", connectedSince_ = " << obj.connectedSince_
<< ", type_ = " << obj.type_
<< ", msgRateExpired_ = " << obj.msgRateExpired_
<< ", msgBacklog_ = " << obj.msgBacklog_
<< "]";
return os;
}
struct ConsumerConfiguration::Impl {
long unAckedMessagesTimeoutMs;
ConsumerType consumerType;
MessageListener messageListener;
bool hasMessageListener;
int receiverQueueSize;
std::string consumerName;
Impl()
: unAckedMessagesTimeoutMs(0),
consumerType(ConsumerExclusive),
messageListener(),
hasMessageListener(false),
receiverQueueSize(1000) {
}
};
ConsumerConfiguration::ConsumerConfiguration()
: impl_(boost::make_shared<Impl>()) {
}
ConsumerConfiguration::~ConsumerConfiguration() {
}
ConsumerConfiguration::ConsumerConfiguration(const ConsumerConfiguration& x)
: impl_(x.impl_) {
}
ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfiguration& x) {
impl_ = x.impl_;
return *this;
}
ConsumerConfiguration& ConsumerConfiguration::setConsumerType(ConsumerType consumerType) {
impl_->consumerType = consumerType;
return *this;
}
ConsumerType ConsumerConfiguration::getConsumerType() const {
return impl_->consumerType;
}
ConsumerConfiguration& ConsumerConfiguration::setMessageListener(MessageListener messageListener) {
impl_->messageListener = messageListener;
impl_->hasMessageListener = true;
return *this;
}
MessageListener ConsumerConfiguration::getMessageListener() const {
return impl_->messageListener;
}
bool ConsumerConfiguration::hasMessageListener() const {
return impl_->hasMessageListener;
}
void ConsumerConfiguration::setReceiverQueueSize(int size) {
impl_->receiverQueueSize = size;
}
int ConsumerConfiguration::getReceiverQueueSize() const {
return impl_->receiverQueueSize;
}
const std::string& ConsumerConfiguration::getConsumerName() const {
return impl_->consumerName;
}
void ConsumerConfiguration::setConsumerName(const std::string& consumerName) {
impl_->consumerName = consumerName;
}
long ConsumerConfiguration::getUnAckedMessagesTimeoutMs() const {
return impl_->unAckedMessagesTimeoutMs;
}
void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds) {
if (milliSeconds < 10000) {
throw "Consumer Config Exception: Unacknowledged message timeout should be greater than 10 seconds.";
}
impl_->unAckedMessagesTimeoutMs = milliSeconds;
}
//////////////////////////////////////////////////////
Consumer::Consumer()
: impl_() {
}
Consumer::Consumer(ConsumerImplBasePtr impl)
: impl_(impl) {
}
const std::string& Consumer::getTopic() const {
return impl_ != NULL ? impl_->getTopic() : EMPTY_STRING;
}
const std::string& Consumer::getSubscriptionName() const {
return impl_ != NULL ? impl_->getSubscriptionName() : EMPTY_STRING;
}
Result Consumer::unsubscribe() {
if (!impl_) {
return ResultConsumerNotInitialized;
}
Promise<bool, Result> promise;
impl_->unsubscribeAsync(WaitForCallback(promise));
Result result;
promise.getFuture().get(result);
return result;
}
void Consumer::unsubscribeAsync(ResultCallback callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
}
impl_->unsubscribeAsync(callback);
}
Result Consumer::receive(Message& msg) {
if (!impl_) {
return ResultConsumerNotInitialized;
}
return impl_->receive(msg);
}
Result Consumer::receive(Message& msg, int timeoutMs) {
if (!impl_) {
return ResultConsumerNotInitialized;
}
return impl_->receive(msg, timeoutMs);
}
Result Consumer::acknowledge(const Message& message) {
return acknowledge(message.getMessageId());
}
Result Consumer::acknowledge(const MessageId& messageId) {
if (!impl_) {
return ResultConsumerNotInitialized;
}
Promise<bool, Result> promise;
impl_->acknowledgeAsync(messageId, WaitForCallback(promise));
Result result;
promise.getFuture().get(result);
return result;
}
void Consumer::acknowledgeAsync(const Message& message, ResultCallback callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
}
impl_->acknowledgeAsync(message.getMessageId(), callback);
}
void Consumer::acknowledgeAsync(const MessageId& messageId, ResultCallback callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
}
impl_->acknowledgeAsync(messageId, callback);
}
Result Consumer::acknowledgeCumulative(const Message& message) {
return acknowledgeCumulative(message.getMessageId());
}
Result Consumer::acknowledgeCumulative(const MessageId& messageId) {
if (!impl_) {
return ResultConsumerNotInitialized;
}
Promise<bool, Result> promise;
impl_->acknowledgeCumulativeAsync(messageId, WaitForCallback(promise));
Result result;
promise.getFuture().get(result);
return result;
}
void Consumer::acknowledgeCumulativeAsync(const Message& message, ResultCallback callback) {
acknowledgeCumulativeAsync(message.getMessageId(), callback);
}
void Consumer::acknowledgeCumulativeAsync(const MessageId& messageId, ResultCallback callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
}
impl_->acknowledgeCumulativeAsync(messageId, callback);
}
Result Consumer::close() {
Promise<bool, Result> promise;
closeAsync(WaitForCallback(promise));
Result result;
promise.getFuture().get(result);
return result;
}
void Consumer::closeAsync(ResultCallback callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
}
impl_->closeAsync(callback);
}
Result Consumer::pauseMessageListener() {
if (!impl_) {
return ResultConsumerNotInitialized;
}
return impl_->pauseMessageListener();
}
Result Consumer::resumeMessageListener() {
if (!impl_) {
return ResultConsumerNotInitialized;
}
return impl_->resumeMessageListener();
}
void Consumer::redeliverUnacknowledgedMessages() {
if (impl_) {
impl_->redeliverUnacknowledgedMessages();
}
}
Result Consumer::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) {
if (!impl_) {
return ResultConsumerNotInitialized;
}
return impl_->getConsumerStats(BrokerConsumerStats, partitionIndex);
}
}