| /** |
| * Copyright 2016 Yahoo Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <pulsar/Consumer.h> |
| #include <pulsar/MessageBuilder.h> |
| #include "ConsumerImpl.h" |
| #include "Utils.h" |
| |
| namespace pulsar { |
| |
| const std::string EMPTY_STRING; |
| |
| BrokerConsumerStats::BrokerConsumerStats():validTill_(now()) {}; |
| |
| BrokerConsumerStats::BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut, |
| double msgRateRedeliver, std::string consumerName, int availablePermits, |
| int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, |
| std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog): |
| validTill_(validTill), |
| msgRateOut_(msgRateOut), |
| msgThroughputOut_(msgThroughputOut), |
| msgRateRedeliver_(msgRateRedeliver), |
| consumerName_(consumerName), |
| availablePermits_(availablePermits), |
| unackedMessages_(unackedMessages), |
| blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs), |
| address_(address), |
| connectedSince_(connectedSince), |
| type_(type), |
| msgRateExpired_(msgRateExpired), |
| msgBacklog_(msgBacklog) |
| {} |
| |
| bool BrokerConsumerStats::isValid() const { |
| return now() <= validTill_; |
| } |
| |
| std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj) { |
| os << "\nBrokerConsumerStats [" |
| << "validTill_ = " << obj.validTill_ |
| << ", msgRateOut_ = " << obj.msgRateOut_ |
| << ", msgThroughputOut_ = " << obj.msgThroughputOut_ |
| << ", msgRateRedeliver_ = " << obj.msgRateRedeliver_ |
| << ", consumerName_ = " << obj.consumerName_ |
| << ", availablePermits_ = " << obj.availablePermits_ |
| << ", unackedMessages_ = " << obj.unackedMessages_ |
| << ", blockedConsumerOnUnackedMsgs_ = " << obj.blockedConsumerOnUnackedMsgs_ |
| << ", address_ = " << obj.address_ |
| << ", connectedSince_ = " << obj.connectedSince_ |
| << ", type_ = " << obj.type_ |
| << ", msgRateExpired_ = " << obj.msgRateExpired_ |
| << ", msgBacklog_ = " << obj.msgBacklog_ |
| << "]"; |
| return os; |
| } |
| |
| struct ConsumerConfiguration::Impl { |
| long unAckedMessagesTimeoutMs; |
| ConsumerType consumerType; |
| MessageListener messageListener; |
| bool hasMessageListener; |
| int receiverQueueSize; |
| std::string consumerName; |
| Impl() |
| : unAckedMessagesTimeoutMs(0), |
| consumerType(ConsumerExclusive), |
| messageListener(), |
| hasMessageListener(false), |
| receiverQueueSize(1000) { |
| } |
| }; |
| |
| ConsumerConfiguration::ConsumerConfiguration() |
| : impl_(boost::make_shared<Impl>()) { |
| } |
| |
| ConsumerConfiguration::~ConsumerConfiguration() { |
| } |
| |
| ConsumerConfiguration::ConsumerConfiguration(const ConsumerConfiguration& x) |
| : impl_(x.impl_) { |
| } |
| |
| ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfiguration& x) { |
| impl_ = x.impl_; |
| return *this; |
| } |
| |
| ConsumerConfiguration& ConsumerConfiguration::setConsumerType(ConsumerType consumerType) { |
| impl_->consumerType = consumerType; |
| return *this; |
| } |
| |
| ConsumerType ConsumerConfiguration::getConsumerType() const { |
| return impl_->consumerType; |
| } |
| |
| ConsumerConfiguration& ConsumerConfiguration::setMessageListener(MessageListener messageListener) { |
| impl_->messageListener = messageListener; |
| impl_->hasMessageListener = true; |
| return *this; |
| } |
| |
| MessageListener ConsumerConfiguration::getMessageListener() const { |
| return impl_->messageListener; |
| } |
| |
| bool ConsumerConfiguration::hasMessageListener() const { |
| return impl_->hasMessageListener; |
| } |
| |
| void ConsumerConfiguration::setReceiverQueueSize(int size) { |
| impl_->receiverQueueSize = size; |
| } |
| |
| int ConsumerConfiguration::getReceiverQueueSize() const { |
| return impl_->receiverQueueSize; |
| } |
| |
| const std::string& ConsumerConfiguration::getConsumerName() const { |
| return impl_->consumerName; |
| } |
| |
| void ConsumerConfiguration::setConsumerName(const std::string& consumerName) { |
| impl_->consumerName = consumerName; |
| } |
| |
| long ConsumerConfiguration::getUnAckedMessagesTimeoutMs() const { |
| return impl_->unAckedMessagesTimeoutMs; |
| } |
| |
| void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds) { |
| if (milliSeconds < 10000) { |
| throw "Consumer Config Exception: Unacknowledged message timeout should be greater than 10 seconds."; |
| } |
| impl_->unAckedMessagesTimeoutMs = milliSeconds; |
| } |
| ////////////////////////////////////////////////////// |
| |
| Consumer::Consumer() |
| : impl_() { |
| } |
| |
| Consumer::Consumer(ConsumerImplBasePtr impl) |
| : impl_(impl) { |
| } |
| |
| const std::string& Consumer::getTopic() const { |
| return impl_ != NULL ? impl_->getTopic() : EMPTY_STRING; |
| } |
| |
| const std::string& Consumer::getSubscriptionName() const { |
| return impl_ != NULL ? impl_->getSubscriptionName() : EMPTY_STRING; |
| } |
| |
| Result Consumer::unsubscribe() { |
| if (!impl_) { |
| return ResultConsumerNotInitialized; |
| } |
| Promise<bool, Result> promise; |
| impl_->unsubscribeAsync(WaitForCallback(promise)); |
| Result result; |
| promise.getFuture().get(result); |
| return result; |
| } |
| |
| void Consumer::unsubscribeAsync(ResultCallback callback) { |
| if (!impl_) { |
| callback(ResultConsumerNotInitialized); |
| return; |
| } |
| |
| impl_->unsubscribeAsync(callback); |
| } |
| |
| Result Consumer::receive(Message& msg) { |
| if (!impl_) { |
| return ResultConsumerNotInitialized; |
| } |
| |
| return impl_->receive(msg); |
| } |
| |
| Result Consumer::receive(Message& msg, int timeoutMs) { |
| if (!impl_) { |
| return ResultConsumerNotInitialized; |
| } |
| |
| return impl_->receive(msg, timeoutMs); |
| } |
| |
| Result Consumer::acknowledge(const Message& message) { |
| return acknowledge(message.getMessageId()); |
| } |
| |
| Result Consumer::acknowledge(const MessageId& messageId) { |
| if (!impl_) { |
| return ResultConsumerNotInitialized; |
| } |
| Promise<bool, Result> promise; |
| impl_->acknowledgeAsync(messageId, WaitForCallback(promise)); |
| Result result; |
| promise.getFuture().get(result); |
| return result; |
| } |
| |
| void Consumer::acknowledgeAsync(const Message& message, ResultCallback callback) { |
| if (!impl_) { |
| callback(ResultConsumerNotInitialized); |
| return; |
| } |
| |
| impl_->acknowledgeAsync(message.getMessageId(), callback); |
| } |
| |
| void Consumer::acknowledgeAsync(const MessageId& messageId, ResultCallback callback) { |
| if (!impl_) { |
| callback(ResultConsumerNotInitialized); |
| return; |
| } |
| |
| impl_->acknowledgeAsync(messageId, callback); |
| } |
| |
| Result Consumer::acknowledgeCumulative(const Message& message) { |
| return acknowledgeCumulative(message.getMessageId()); |
| } |
| |
| Result Consumer::acknowledgeCumulative(const MessageId& messageId) { |
| if (!impl_) { |
| return ResultConsumerNotInitialized; |
| } |
| |
| Promise<bool, Result> promise; |
| impl_->acknowledgeCumulativeAsync(messageId, WaitForCallback(promise)); |
| Result result; |
| promise.getFuture().get(result); |
| return result; |
| } |
| |
| void Consumer::acknowledgeCumulativeAsync(const Message& message, ResultCallback callback) { |
| acknowledgeCumulativeAsync(message.getMessageId(), callback); |
| } |
| |
| void Consumer::acknowledgeCumulativeAsync(const MessageId& messageId, ResultCallback callback) { |
| if (!impl_) { |
| callback(ResultConsumerNotInitialized); |
| return; |
| } |
| |
| impl_->acknowledgeCumulativeAsync(messageId, callback); |
| } |
| |
| Result Consumer::close() { |
| Promise<bool, Result> promise; |
| closeAsync(WaitForCallback(promise)); |
| |
| Result result; |
| promise.getFuture().get(result); |
| return result; |
| } |
| |
| void Consumer::closeAsync(ResultCallback callback) { |
| if (!impl_) { |
| callback(ResultConsumerNotInitialized); |
| return; |
| } |
| |
| impl_->closeAsync(callback); |
| } |
| |
| Result Consumer::pauseMessageListener() { |
| if (!impl_) { |
| return ResultConsumerNotInitialized; |
| } |
| |
| return impl_->pauseMessageListener(); |
| } |
| |
| Result Consumer::resumeMessageListener() { |
| if (!impl_) { |
| return ResultConsumerNotInitialized; |
| } |
| |
| return impl_->resumeMessageListener(); |
| } |
| |
| void Consumer::redeliverUnacknowledgedMessages() { |
| if (impl_) { |
| impl_->redeliverUnacknowledgedMessages(); |
| } |
| } |
| |
| Result Consumer::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) { |
| if (!impl_) { |
| return ResultConsumerNotInitialized; |
| } |
| return impl_->getConsumerStats(BrokerConsumerStats, partitionIndex); |
| } |
| } |