blob: 8d50269af5339e5cd773675f479b9f15f2c8dff3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef TUBEMQ_CLIENT_BASE_CLIENT_H_
#define TUBEMQ_CLIENT_BASE_CLIENT_H_
#include <stdint.h>
#include <map>
#include <mutex>
#include <string>
#include <thread>
#include "connection_pool.h"
#include "file_ini.h"
#include "noncopyable.h"
#include "rmt_data_cache.h"
#include "thread_pool.h"
#include "tubemq/tubemq_atomic.h"
#include "tubemq/tubemq_config.h"
#include "tubemq/tubemq_message.h"
#include "tubemq/tubemq_return.h"
namespace tubemq {
using std::map;
using std::mutex;
using std::string;
using std::thread;
class BaseClient {
public:
explicit BaseClient(bool is_producer);
virtual ~BaseClient();
virtual void ShutDown() {}
void SetClientIndex(int32_t client_index) { client_index_ = client_index; }
bool IsProducer() { return is_producer_; }
const int32_t GetClientIndex() { return client_index_; }
protected:
bool is_producer_;
int32_t client_index_;
};
using BaseClientPtr = std::shared_ptr<BaseClient>;
class TubeMQService : public noncopyable {
public:
static TubeMQService* Instance();
bool Start(string& err_info, const TubeMQServiceConfig& serviceConfig);
// Deprecated method
bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf");
bool Stop(string& err_info);
bool IsRunning();
const int32_t GetServiceStatus() const { return service_status_.Get(); }
int32_t GetClientObjCnt();
bool AddClientObj(string& err_info, BaseClientPtr client_obj);
BaseClientPtr GetClientObj(int32_t client_index) const;
void RmvClientObj(BaseClientPtr client_obj);
const string& GetLocalHost() const { return local_host_; }
ExecutorPoolPtr GetTimerExecutorPool() { return timer_executor_; }
SteadyTimerPtr CreateTimer() { return timer_executor_->Get()->CreateSteadyTimer(); }
ExecutorPoolPtr GetNetWorkExecutorPool() { return network_executor_; }
ConnectionPoolPtr GetConnectionPool() { return connection_pool_; }
template <class function>
void Post(function f) {
if (thread_pool_ != nullptr) {
thread_pool_->Post(f);
}
}
bool AddMasterAddress(string& err_info, const string& master_info);
void GetXfsMasterAddress(const string& source, string& target);
protected:
void updMasterAddrByDns();
private:
TubeMQService();
~TubeMQService();
void iniServiceConfigure();
void thread_task_dnsxfs(int dns_xfs_period_ms);
void shutDownClinets() const;
bool hasXfsTask(map<string, int32_t>& src_addr_map);
bool addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map);
bool getServiceConfByFile(string& err_info,
string conf_file, TubeMQServiceConfig& serviceConfig);
private:
static TubeMQService* _instance;
string local_host_;
TubeMQServiceConfig serviceConfig_;
AtomicInteger service_status_;
AtomicInteger client_index_base_;
mutable mutex mutex_;
map<int32_t, BaseClientPtr> clients_map_;
ExecutorPoolPtr timer_executor_;
ExecutorPoolPtr network_executor_;
ConnectionPoolPtr connection_pool_;
std::shared_ptr<ThreadPool> thread_pool_;
thread dns_xfs_thread_;
mutable mutex dns_mutex_;
int64_t last_check_time_;
map<string, int32_t> master_source_;
map<string, string> master_target_;
};
} // namespace tubemq
#endif // TUBEMQ_CLIENT_BASE_CLIENT_H_