blob: eb8249fb5ca0ce917e8b28b21a7702f7ff249740 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "client_service.h"
#include <sstream>
#include "const_config.h"
#include "logger.h"
#include "utils.h"
namespace tubemq {
using std::lock_guard;
using std::stringstream;
const uint32_t ConnectionPool::kRegularTimerSecond;
BaseClient::BaseClient(bool is_producer) {
is_producer_ = is_producer;
client_index_ = tb_config::kInvalidValue;
}
BaseClient::~BaseClient() {
// no code
}
TubeMQService* TubeMQService::_instance = NULL;
static mutex tubemq_mutex_service_;
TubeMQService* TubeMQService::Instance() {
if (NULL == _instance) {
lock_guard<mutex> lck(tubemq_mutex_service_);
if (NULL == _instance) {
_instance = new TubeMQService;
}
}
return _instance;
}
TubeMQService::TubeMQService()
: timer_executor_(std::make_shared<ExecutorPool>(2)),
network_executor_(std::make_shared<ExecutorPool>(4)) {
service_status_.Set(0);
client_index_base_.Set(0);
last_check_time_ = 0;
}
TubeMQService::~TubeMQService() {
string err_info;
Stop(err_info);
}
bool TubeMQService::Start(string& err_info,
const TubeMQServiceConfig& serviceConfig) {
bool result = false;
result = Utils::GetLocalIPV4Address(err_info, local_host_);
if (!result) {
return result;
}
if (!service_status_.CompareAndSet(0, 1)) {
err_info = "TubeMQ Service has startted or Stopped!";
return false;
}
serviceConfig_ = serviceConfig;
iniServiceConfigure();
service_status_.Set(2);
err_info = "Ok!";
LOG_INFO("[TubeMQService] TubeMQ service startted! initial configure is %s ",
serviceConfig.ToString().c_str());
return true;
}
bool TubeMQService::Start(string& err_info, string conf_file) {
// check configure file
TubeMQServiceConfig serviceConfig;
if (!getServiceConfByFile(err_info, conf_file, serviceConfig)) {
return false;
}
return Start(err_info, serviceConfig);
}
bool TubeMQService::Stop(string& err_info) {
if (service_status_.CompareAndSet(2, -1)) {
LOG_INFO("[TubeMQService] TubeMQ service begin to stop!");
if (dns_xfs_thread_.joinable()) {
dns_xfs_thread_.join();
}
shutDownClinets();
connection_pool_ = nullptr;
thread_pool_ = nullptr;
timer_executor_->Close();
network_executor_->Close();
LOG_INFO("[TubeMQService] TubeMQ service stopped!");
}
err_info = "OK!";
return true;
}
bool TubeMQService::IsRunning() { return (service_status_.Get() == 2); }
void TubeMQService::iniServiceConfigure() {
// initial logger parameters
GetLogger().Init(serviceConfig_.GetLogStorePath(),
Logger::Level(serviceConfig_.GetLogPrintLevel()),
serviceConfig_.GetMaxLogFileSize(), serviceConfig_.GetMaxLogFileNum());
// initial dns translate thread
dns_xfs_thread_ = std::thread(&TubeMQService::thread_task_dnsxfs,
this, serviceConfig_.GetDnsXfsPeriodInMs());
// initial service thread pools
timer_executor_->Resize(serviceConfig_.GetTimerThreads());
network_executor_->Resize(serviceConfig_.GetNetWorkThreads());
thread_pool_ = std::make_shared<ThreadPool>(serviceConfig_.GetSignalThreads());
connection_pool_ = std::make_shared<ConnectionPool>(network_executor_);
}
int32_t TubeMQService::GetClientObjCnt() {
lock_guard<mutex> lck(mutex_);
return clients_map_.size();
}
bool TubeMQService::AddClientObj(string& err_info, BaseClientPtr client_obj) {
if (!IsRunning()) {
err_info = "Service not startted!";
return false;
}
int32_t client_index = client_index_base_.IncrementAndGet();
{
lock_guard<mutex> lck(mutex_);
clients_map_[client_index] = client_obj;
}
client_obj->SetClientIndex(client_index);
err_info = "Ok";
return true;
}
BaseClientPtr TubeMQService::GetClientObj(int32_t client_index) const {
BaseClientPtr client_obj = nullptr;
map<int32_t, BaseClientPtr>::const_iterator it;
lock_guard<mutex> lck(mutex_);
it = clients_map_.find(client_index);
if (it != clients_map_.end()) {
client_obj = it->second;
}
return client_obj;
}
void TubeMQService::RmvClientObj(BaseClientPtr client_obj) {
if (client_obj != nullptr) {
lock_guard<mutex> lck(mutex_);
clients_map_.erase(client_obj->GetClientIndex());
client_obj->SetClientIndex(tb_config::kInvalidValue);
}
}
void TubeMQService::shutDownClinets() const {
map<int32_t, BaseClientPtr>::const_iterator it;
lock_guard<mutex> lck(mutex_);
for (it = clients_map_.begin(); it != clients_map_.end(); it++) {
it->second->ShutDown();
}
}
bool TubeMQService::AddMasterAddress(string& err_info, const string& master_info) {
map<string, int32_t>::iterator it;
map<string, int32_t> tmp_addr_map;
map<string, int32_t> new_addr_map;
Utils::Split(master_info, tmp_addr_map, delimiter::kDelimiterComma, delimiter::kDelimiterColon);
if (tmp_addr_map.empty()) {
err_info = "Illegal parameter: master_info is blank!";
return false;
}
for (it = tmp_addr_map.begin(); it != tmp_addr_map.end(); ++it) {
if (Utils::NeedDnsXfs(it->first)) {
new_addr_map[it->first] = it->second;
}
}
if (new_addr_map.empty()) {
err_info = "Ok";
return true;
}
if (addNeedDnsXfsAddr(new_addr_map)) {
updMasterAddrByDns();
}
err_info = "Ok";
return true;
}
void TubeMQService::GetXfsMasterAddress(const string& source, string& target) {
target = source;
lock_guard<mutex> lck(dns_mutex_);
if (master_source_.find(source) != master_source_.end()) {
target = master_target_[source];
}
}
void TubeMQService::thread_task_dnsxfs(int dns_xfs_period_ms) {
LOG_INFO("[TubeMQService] DSN transfer thread startted!");
while (true) {
if (TubeMQService::Instance()->GetServiceStatus() <= 0) {
break;
}
if ((Utils::GetCurrentTimeMillis() - last_check_time_) >= dns_xfs_period_ms) {
TubeMQService::Instance()->updMasterAddrByDns();
last_check_time_ = Utils::GetCurrentTimeMillis();
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
LOG_INFO("[TubeMQService] DSN transfer thread stopped!");
}
bool TubeMQService::hasXfsTask(map<string, int32_t>& src_addr_map) {
lock_guard<mutex> lck(dns_mutex_);
if (!master_source_.empty()) {
src_addr_map = master_source_;
return true;
}
return false;
}
bool TubeMQService::addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map) {
bool added = false;
map<string, int32_t>::iterator it;
if (!src_addr_map.empty()) {
lock_guard<mutex> lck(dns_mutex_);
for (it = src_addr_map.begin(); it != src_addr_map.end(); it++) {
if (master_source_.find(it->first) == master_source_.end()) {
added = true;
master_source_[it->first] = it->second;
}
}
}
return added;
}
void TubeMQService::updMasterAddrByDns() {
map<string, int32_t> tmp_src_addr_map;
map<string, string> tmp_tgt_addr_map;
map<string, string>::iterator it;
if (!hasXfsTask(tmp_src_addr_map)) {
return;
}
Utils::XfsAddrByDns(tmp_src_addr_map, tmp_tgt_addr_map);
lock_guard<mutex> lck(dns_mutex_);
for (it = tmp_tgt_addr_map.begin(); it != tmp_tgt_addr_map.end(); it++) {
master_target_[it->first] = it->second;
}
}
bool TubeMQService::getServiceConfByFile(string& err_info,
string conf_file, TubeMQServiceConfig& serviceConfig) {
// check configure file
bool result = false;
Fileini fileini;
string sector = "TubeMQ";
result = Utils::ValidConfigFile(err_info, conf_file);
if (!result) {
return result;
}
result = fileini.Loadini(err_info, conf_file);
if (!result) {
return result;
}
// get log paremeters
int32_t log_num = 10;
int32_t log_size = 100;
int32_t log_level = 4;
string log_path = "../log/tubemq";
fileini.GetValue(err_info, sector, "log_num", log_num, 10);
fileini.GetValue(err_info, sector, "log_size", log_size, 100);
fileini.GetValue(err_info, sector, "log_path", log_path, "../log/tubemq");
fileini.GetValue(err_info, sector, "log_level", log_level, 4);
log_level = TUBEMQ_MID(log_level, 4, 0);
serviceConfig.SetLogCofigInfo(log_num, log_size, log_level, log_path);
// get dns translate period
int32_t dns_xfs_period_ms = 30 * 1000;
fileini.GetValue(err_info, sector, "dns_xfs_period_ms", dns_xfs_period_ms, 30 * 1000);
serviceConfig.SetDnsXfsPeriodInMs(dns_xfs_period_ms);
// get thread pools paremeters
int32_t timer_threads = 2;
int32_t network_threads = 4;
int32_t signal_threads = 8;
fileini.GetValue(err_info, sector, "timer_threads", timer_threads, 2);
fileini.GetValue(err_info, sector, "network_threads", network_threads, 4);
fileini.GetValue(err_info, sector, "signal_threads", signal_threads, 8);
serviceConfig.SetServiceThreads(timer_threads, network_threads, signal_threads);
err_info = "Ok";
return true;
}
} // namespace tubemq