blob: 28436c5295d8c518ceba165880f4ac6857c2bef8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef TUBEMQ_CLIENT_IMP_CONSUMER_API_H_
#define TUBEMQ_CLIENT_IMP_CONSUMER_API_H_
#include <stdlib.h>
#include <list>
#include <mutex>
#include <string>
#include "BrokerService.pb.h"
#include "MasterService.pb.h"
#include "RPC.pb.h"
#include "client_service.h"
#include "client_subinfo.h"
#include "meta_info.h"
#include "rmt_data_cache.h"
#include "tubemq/tubemq_atomic.h"
#include "tubemq/tubemq_config.h"
#include "tubemq/tubemq_message.h"
#include "tubemq/tubemq_return.h"
#include "tubemq_codec.h"
namespace tubemq {
using std::mutex;
using std::string;
class BaseConsumer : public BaseClient, public std::enable_shared_from_this<BaseConsumer> {
public:
BaseConsumer();
~BaseConsumer();
bool Start(string& err_info, const ConsumerConfig& config);
virtual void ShutDown();
bool GetMessage(ConsumerResult& result);
bool Confirm(const string& confirm_context, bool is_consumed, ConsumerResult& result);
bool GetCurConsumedInfo(map<string, ConsumeOffsetInfo>& consume_info_map);
private:
bool register2Master(int32_t& error_code, string& err_info, bool need_change);
void heartBeat2Master();
void processRebalanceEvent();
void close2Master();
void closeAllBrokers();
private:
string buildUUID();
bool isClientRunning();
bool IsConsumeReady(ConsumerResult& result);
int32_t getConsumeReadStatus(bool is_first_reg);
bool initMasterAddress(string& err_info, const string& master_info);
void getNextMasterAddr(string& ipaddr, int32_t& port);
void getCurrentMasterAddr(string& ipaddr, int32_t& port);
bool needGenMasterCertificateInfo(bool force);
void genBrokerAuthenticInfo(AuthorizedInfo* p_authInfo, bool force);
void processAuthorizedToken(const MasterAuthorizedInfo& authorized_token_info);
void addBrokerHBTimer(const NodeInfo broker);
void reSetBrokerHBTimer(const NodeInfo broker);
private:
void buidRegisterRequestC2M(TubeMQCodec::ReqProtocolPtr& req_protocol);
void buidHeartRequestC2M(TubeMQCodec::ReqProtocolPtr& req_protocol);
void buidCloseRequestC2M(TubeMQCodec::ReqProtocolPtr& req_protocol);
void buidRegisterRequestC2B(const PartitionExt& partition,
TubeMQCodec::ReqProtocolPtr& req_protocol);
void buidUnRegRequestC2B(const PartitionExt& partition,
TubeMQCodec::ReqProtocolPtr& req_protocol);
void buidHeartBeatC2B(const list<PartitionExt>& partitions,
TubeMQCodec::ReqProtocolPtr& req_protocol);
void buidGetMessageC2B(const PartitionExt& partition, TubeMQCodec::ReqProtocolPtr& req_protocol);
void buidCommitC2B(const PartitionExt& partition, bool is_last_consumed,
TubeMQCodec::ReqProtocolPtr& req_protocol);
void genMasterAuthenticateToken(AuthenticateInfo* pauthinfo, const string& username,
const string usrpassword);
bool processRegisterResponseM2C(int32_t& error_code, string& err_info,
const TubeMQCodec::RspProtocolPtr& rsp_protocol);
bool processHBResponseM2C(int32_t& error_code, string& err_info,
const TubeMQCodec::RspProtocolPtr& rsp_protocol);
void processDisConnect2Broker(ConsumerEvent& event);
void processConnect2Broker(ConsumerEvent& event);
void unregister2Brokers(map<NodeInfo, list<PartitionExt> >& unreg_partitions, bool wait_rsp);
bool processRegResponseB2C(int32_t& error_code, string& err_info,
const TubeMQCodec::RspProtocolPtr& rsp_protocol);
void processHeartBeat2Broker(NodeInfo broker_info);
bool processGetMessageRspB2C(ConsumerResult& result, PeerInfo& peer_info, bool filter_consume,
const PartitionExt& partition_ext, const string& confirm_context,
const TubeMQCodec::RspProtocolPtr& rsp_protocol);
void convertMessages(int32_t& msg_size, list<Message>& message_list, bool filter_consume,
const string& topic_name, GetMessageResponseB2C& rsp_b2c);
inline int32_t nextHeartBeatPeriodms();
void asyncRegister2Master(bool need_change);
private:
int32_t client_indexid_;
string client_uuid_;
AtomicInteger status_;
ConsumerConfig config_;
ClientSubInfo sub_info_;
RmtDataCacheCsm rmtdata_cache_;
AtomicLong visit_token_;
mutable mutex auth_lock_;
string authorized_info_;
AtomicBoolean nextauth_2_master;
AtomicBoolean nextauth_2_broker;
string curr_master_addr_;
map<string, int32_t> masters_map_;
AtomicBoolean is_master_actived_;
AtomicInteger master_reg_status_;
int32_t master_sh_retry_cnt_;
int64_t last_master_hbtime_;
int32_t unreport_times_;
// master heartbeat timer
SteadyTimerPtr heart_beat_timer_;
AtomicInteger master_hb_status_;
std::shared_ptr<std::thread> rebalance_thread_ptr_;
// broker heartbeat timer
mutable mutex broker_timer_lock_;
map<NodeInfo, SteadyTimerPtr> broker_timer_map_;
};
using BaseConsumerPtr = std::shared_ptr<BaseConsumer>;
} // namespace tubemq
#endif // TUBEMQ_CLIENT_IMP_CONSUMER_API_H_