| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef __DEFAULTMQPUSHCONSUMERIMPL_H__ |
| #define __DEFAULTMQPUSHCONSUMERIMPL_H__ |
| |
| #include <boost/asio.hpp> |
| #include <boost/asio/io_service.hpp> |
| #include <boost/bind.hpp> |
| #include <boost/date_time/posix_time/posix_time.hpp> |
| #include <boost/scoped_ptr.hpp> |
| #include <boost/thread/thread.hpp> |
| #include <string> |
| #include "AsyncCallback.h" |
| #include "ConsumeMessageContext.h" |
| #include "ConsumeMessageHook.h" |
| #include "DefaultMQProducerImpl.h" |
| #include "MQConsumer.h" |
| #include "MQMessageListener.h" |
| #include "MQMessageQueue.h" |
| |
| namespace rocketmq { |
| |
| class Rebalance; |
| class SubscriptionData; |
| class OffsetStore; |
| class PullAPIWrapper; |
| class PullRequest; |
| class ConsumeMsgService; |
| class TaskQueue; |
| class TaskThread; |
| class AsyncPullCallback; |
| class ConsumerRunningInfo; |
| //<!*************************************************************************** |
| class DefaultMQPushConsumerImpl : public MQConsumer { |
| public: |
| DefaultMQPushConsumerImpl(); |
| DefaultMQPushConsumerImpl(const std::string& groupname); |
| void boost_asio_work(); |
| virtual ~DefaultMQPushConsumerImpl(); |
| |
| //<!begin mqadmin; |
| virtual void start(); |
| virtual void shutdown(); |
| //<!end mqadmin; |
| |
| //<!begin MQConsumer |
| virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName); |
| virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs); |
| virtual void doRebalance(); |
| virtual void persistConsumerOffset(); |
| virtual void persistConsumerOffsetByResetOffset(); |
| virtual void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info); |
| virtual ConsumeType getConsumeType(); |
| virtual ConsumeFromWhere getConsumeFromWhere(); |
| void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere); |
| virtual void getSubscriptions(std::vector<SubscriptionData>&); |
| virtual void updateConsumeOffset(const MQMessageQueue& mq, int64 offset); |
| virtual void removeConsumeOffset(const MQMessageQueue& mq); |
| virtual PullResult pull(const MQMessageQueue& mq, const std::string& subExpression, int64 offset, int maxNums) { |
| return PullResult(); |
| } |
| virtual void pull(const MQMessageQueue& mq, |
| const std::string& subExpression, |
| int64 offset, |
| int maxNums, |
| PullCallback* pPullCallback) {} |
| virtual ConsumerRunningInfo* getConsumerRunningInfo(); |
| //<!end MQConsumer; |
| |
| void registerMessageListener(MQMessageListener* pMessageListener); |
| MessageListenerType getMessageListenerType(); |
| void subscribe(const std::string& topic, const std::string& subExpression); |
| |
| OffsetStore* getOffsetStore() const; |
| virtual Rebalance* getRebalance() const; |
| ConsumeMsgService* getConsumerMsgService() const; |
| |
| virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>); |
| virtual bool producePullMsgTaskLater(boost::weak_ptr<PullRequest>, int millis); |
| static void static_triggerNextPullRequest(void* context, |
| boost::asio::deadline_timer* t, |
| boost::weak_ptr<PullRequest>); |
| void triggerNextPullRequest(boost::asio::deadline_timer* t, boost::weak_ptr<PullRequest>); |
| void runPullMsgQueue(TaskQueue* pTaskQueue); |
| void pullMessage(boost::weak_ptr<PullRequest> pullrequest); |
| void pullMessageAsync(boost::weak_ptr<PullRequest> pullrequest); |
| void setAsyncPull(bool asyncFlag); |
| AsyncPullCallback* getAsyncPullCallBack(boost::weak_ptr<PullRequest>, MQMessageQueue msgQueue); |
| void shutdownAsyncPullCallBack(); |
| |
| /* |
| for orderly consume, set the pull num of message size by each pullMsg, |
| default value is 1; |
| */ |
| void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize); |
| int getConsumeMessageBatchMaxSize() const; |
| |
| /* |
| set consuming thread count, default value is cpu cores |
| */ |
| void setConsumeThreadCount(int threadCount); |
| int getConsumeThreadCount() const; |
| void setMaxReconsumeTimes(int maxReconsumeTimes); |
| int getMaxReconsumeTimes() const; |
| |
| /* |
| set pullMsg thread count, default value is cpu cores |
| */ |
| void setPullMsgThreadPoolCount(int threadCount); |
| int getPullMsgThreadPoolCount() const; |
| |
| /* |
| set max cache msg size perQueue in memory if consumer could not consume msgs |
| immediately |
| default maxCacheMsgSize perQueue is 1000, set range is:1~65535 |
| */ |
| void setMaxCacheMsgSizePerQueue(int maxCacheSize); |
| int getMaxCacheMsgSizePerQueue() const; |
| void submitSendTraceRequest(MQMessage& msg, SendCallback* pSendCallback); |
| bool hasConsumeMessageHook(); |
| |
| void registerConsumeMessageHook(std::shared_ptr<ConsumeMessageHook>& hook); |
| void setDefaultMqProducerImpl(DefaultMQProducerImpl* DefaultMqProducerImpl); |
| void executeConsumeMessageHookBefore(ConsumeMessageContext* context); |
| void executeConsumeMessageHookAfter(ConsumeMessageContext* context); |
| |
| private: |
| void checkConfig(); |
| void copySubscription(); |
| void updateTopicSubscribeInfoWhenSubscriptionChanged(); |
| bool dealWithNameSpace(); |
| void logConfigs(); |
| |
| bool dealWithMessageTrace(); |
| void createMessageTraceInnerProducer(); |
| void shutdownMessageTraceInnerProducer(); |
| |
| private: |
| uint64_t m_startTime; |
| ConsumeFromWhere m_consumeFromWhere; |
| std::map<std::string, std::string> m_subTopics; |
| int m_consumeThreadCount; |
| OffsetStore* m_pOffsetStore; |
| Rebalance* m_pRebalance; |
| PullAPIWrapper* m_pPullAPIWrapper; |
| ConsumeMsgService* m_consumerService; |
| MQMessageListener* m_pMessageListener; |
| int m_consumeMessageBatchMaxSize; |
| int m_maxMsgCacheSize; |
| int m_maxReconsumeTimes = -1; |
| boost::asio::io_service m_async_ioService; |
| boost::scoped_ptr<boost::thread> m_async_service_thread; |
| |
| typedef std::map<MQMessageQueue, AsyncPullCallback*> PullMAP; |
| PullMAP m_PullCallback; |
| bool m_asyncPull; |
| int m_asyncPullTimeout; |
| int m_pullMsgThreadPoolNum; |
| |
| private: |
| TaskQueue* m_pullmsgQueue; |
| std::unique_ptr<boost::thread> m_pullmsgThread; |
| |
| // used for trace |
| std::vector<std::shared_ptr<ConsumeMessageHook> > m_consumeMessageHookList; |
| std::shared_ptr<DefaultMQProducerImpl> m_DefaultMQProducerImpl; |
| }; |
| //<!*************************************************************************** |
| } // namespace rocketmq |
| #endif |