blob: b5c51ec98fecd45c66db574a4de0d68f057a58db [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_MULTI_TOPICS_CONSUMER_HEADER
#define PULSAR_MULTI_TOPICS_CONSUMER_HEADER
#include <pulsar/Client.h>
#include <memory>
#include <vector>
#include "Commands.h"
#include "ConsumerImplBase.h"
#include "ConsumerInterceptors.h"
#include "Future.h"
#include "Latch.h"
#include "LookupDataResult.h"
#include "SynchronizedHashMap.h"
#include "TestUtil.h"
#include "TimeUtils.h"
#include "UnboundedBlockingQueue.h"
namespace pulsar {
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
class ConsumerImpl;
using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
class TopicName;
using TopicNamePtr = std::shared_ptr<TopicName>;
class MultiTopicsBrokerConsumerStatsImpl;
using MultiTopicsBrokerConsumerStatsPtr = std::shared_ptr<MultiTopicsBrokerConsumerStatsImpl>;
class UnAckedMessageTrackerInterface;
using UnAckedMessageTrackerPtr = std::shared_ptr<UnAckedMessageTrackerInterface>;
class LookupService;
using LookupServicePtr = std::shared_ptr<LookupService>;
class MultiTopicsConsumerImpl;
class MultiTopicsConsumerImpl : public ConsumerImplBase {
public:
MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr, const ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
boost::optional<MessageId> startMessageId = boost::none);
MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_,
const ConsumerInterceptorsPtr& interceptors,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
boost::optional<MessageId> startMessageId = boost::none);
~MultiTopicsConsumerImpl();
// overrided methods from ConsumerImplBase
Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() override;
const std::string& getSubscriptionName() const override;
const std::string& getTopic() const override;
Result receive(Message& msg) override;
Result receive(Message& msg, int timeout) override;
void receiveAsync(ReceiveCallback callback) override;
void unsubscribeAsync(ResultCallback callback) override;
void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override;
void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) override;
void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override;
void closeAsync(ResultCallback callback) override;
void start() override;
void shutdown() override;
bool isClosed() override;
bool isOpen() override;
Result pauseMessageListener() override;
Result resumeMessageListener() override;
void redeliverUnacknowledgedMessages() override;
void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) override;
const std::string& getName() const override;
int getNumOfPrefetchedMessages() const override;
void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override;
void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) override;
void seekAsync(const MessageId& msgId, ResultCallback callback) override;
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;
void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;
void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
size_t, BrokerConsumerStatsCallback);
// return first topic name when all topics name valid, or return null pointer
static std::shared_ptr<TopicName> topicNamesValid(const std::vector<std::string>& topics);
void unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback);
Future<Result, Consumer> subscribeOneTopicAsync(const std::string& topic);
protected:
const ClientImplWeakPtr client_;
const std::string subscriptionName_;
std::string consumerStr_;
const ConsumerConfiguration conf_;
typedef SynchronizedHashMap<std::string, ConsumerImplPtr> ConsumerMap;
ConsumerMap consumers_;
std::map<std::string, int> topicsPartitions_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
UnboundedBlockingQueue<Message> incomingMessages_;
std::atomic_int incomingMessagesSize_ = {0};
MessageListener messageListener_;
DeadlineTimerPtr partitionsUpdateTimer_;
TimeDuration partitionsUpdateInterval_;
LookupServicePtr lookupServicePtr_;
std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
std::atomic<Result> failedResult{ResultOk};
Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string> topics_;
std::queue<ReceiveCallback> pendingReceives_;
const Commands::SubscriptionMode subscriptionMode_;
boost::optional<MessageId> startMessageId_;
ConsumerInterceptorsPtr interceptors_;
std::atomic_bool duringSeek_{false};
/* methods */
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
unsigned int partitionIndex);
void notifyResult(CloseCallback closeCallback);
void messageReceived(Consumer consumer, const Message& msg);
void messageProcessed(Message& msg);
void internalListener(Consumer consumer);
void receiveMessages();
void failPendingReceiveCallback();
void notifyPendingReceivedCallback(Result result, const Message& message,
const ReceiveCallback& callback);
void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate);
void subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName, const std::string& consumerName,
ConsumerSubResultPromisePtr topicSubResultPromise);
void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate,
ConsumerSubResultPromisePtr topicSubResultPromise);
void handleUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
ResultCallback callback);
void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
int numberPartitions, TopicNamePtr topicNamePtr,
std::string& topicPartitionName, ResultCallback callback);
void runPartitionUpdateTask();
void topicPartitionUpdate();
void handleGetPartitions(TopicNamePtr topicName, Result result,
const LookupDataResultPtr& lookupDataResult, int currentNumPartitions);
void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr topicName, int partitionIndex,
ConsumerSubResultPromisePtr topicSubResultPromise,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate);
// impl consumer base virtual method
bool hasEnoughMessagesForBatchReceive() const override;
void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
void beforeConnectionChange(ClientConnection& cnx) override;
friend class PulsarFriend;
private:
std::shared_ptr<MultiTopicsConsumerImpl> get_shared_this_ptr();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
void cancelTimers() noexcept;
std::weak_ptr<MultiTopicsConsumerImpl> weak_from_this() noexcept {
return std::static_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
}
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic);
FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck);
};
typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
} // namespace pulsar
#endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER