blob: 537dc94db7f78dab158e2fd501482bf925706dbe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "tubemq/tubemq_config.h"
#include <sstream>
#include <vector>
#include "const_config.h"
#include "const_rpc.h"
#include "utils.h"
namespace tubemq {
using std::set;
using std::stringstream;
using std::vector;
TubeMQServiceConfig::TubeMQServiceConfig() {
log_num_ = tb_config::kLogNumDef;
log_size_ = tb_config::kLogSizeDefMB;
log_level_ = tb_config::kLogLevelDef;
log_path_ = tb_config::kLogPathDef;
dns_xfs_period_ms_ = tb_config::kDnsXfsPeriodInMsDef;
timer_threads_ = tb_config::kTimerThreadNumDef;
network_threads_ = tb_config::kNetworkThreadNumDef;
signal_threads_ = tb_config::kSignalThreadNumDef;
}
TubeMQServiceConfig::~TubeMQServiceConfig() {
//
}
void TubeMQServiceConfig::SetLogCofigInfo(int32_t log_max_num,
int32_t log_max_size, int32_t log_level, const string& log_path) {
log_num_ = log_max_num;
log_size_ = log_max_size;
log_level_ = log_level;
log_path_ = log_path;
log_level_ = TUBEMQ_MID(log_level, 4, 0);
}
void TubeMQServiceConfig::SetDnsXfsPeriodInMs(int32_t dns_xfs_period_ms) {
dns_xfs_period_ms_ =
TUBEMQ_MID(dns_xfs_period_ms, tb_config::kMaxIntValue, 10000);
}
void TubeMQServiceConfig::SetServiceThreads(int32_t timer_threads,
int32_t network_threads, int32_t signal_threads) {
timer_threads_ = TUBEMQ_MID(timer_threads, 50, 2);
network_threads_ = TUBEMQ_MID(network_threads, 50, 4);
signal_threads_ = TUBEMQ_MID(signal_threads, 50, 4);
}
const int32_t TubeMQServiceConfig::GetMaxLogFileNum() const {
return log_num_;
}
const int32_t TubeMQServiceConfig::GetMaxLogFileSize() const {
return log_size_;
}
const int32_t TubeMQServiceConfig::GetLogPrintLevel() const {
return log_level_;
}
const string& TubeMQServiceConfig::GetLogStorePath() const {
return log_path_;
}
const int32_t TubeMQServiceConfig::GetDnsXfsPeriodInMs() const {
return dns_xfs_period_ms_;
}
const int32_t TubeMQServiceConfig::GetTimerThreads() const {
return timer_threads_;
}
const int32_t TubeMQServiceConfig::GetNetWorkThreads() const {
return network_threads_;
}
const int32_t TubeMQServiceConfig::GetSignalThreads() const {
return signal_threads_;
}
const string TubeMQServiceConfig::ToString() const {
stringstream ss;
ss << "TubeMQServiceConfig={log_num_=";
ss << log_num_;
ss << ", log_size_=";
ss << log_size_;
ss << ", log_level_=";
ss << log_level_;
ss << ", log_path_='";
ss << log_path_;
ss << "', dns_xfs_period_ms_=";
ss << dns_xfs_period_ms_;
ss << ", timer_threads_=";
ss << timer_threads_;
ss << ", network_threads_=";
ss << network_threads_;
ss << ", signal_threads_=";
ss << signal_threads_;
ss << "}";
return ss.str();
}
BaseConfig::BaseConfig() {
master_addrinfo_ = "";
auth_enable_ = false;
auth_usrname_ = "";
auth_usrpassword_ = "";
tls_enabled_ = false;
tls_trust_store_path_ = "";
tls_trust_store_password_ = "";
rpc_read_timeout_ms_ = tb_config::kRpcTimoutDefMs;
heartbeat_period_ms_ = tb_config::kHeartBeatPeriodDefMs;
max_heartbeat_retry_times_ = tb_config::kHeartBeatFailRetryTimesDef;
heartbeat_period_afterfail_ms_ = tb_config::kHeartBeatSleepPeriodDefMs;
}
BaseConfig::~BaseConfig() {
//
}
BaseConfig& BaseConfig::operator=(const BaseConfig& target) {
if (this != &target) {
master_addrinfo_ = target.master_addrinfo_;
auth_enable_ = target.auth_enable_;
auth_usrname_ = target.auth_usrname_;
auth_usrpassword_ = target.auth_usrpassword_;
tls_enabled_ = target.tls_enabled_;
tls_trust_store_path_ = target.tls_trust_store_path_;
tls_trust_store_password_ = target.tls_trust_store_password_;
rpc_read_timeout_ms_ = target.rpc_read_timeout_ms_;
heartbeat_period_ms_ = target.heartbeat_period_ms_;
max_heartbeat_retry_times_ = target.max_heartbeat_retry_times_;
heartbeat_period_afterfail_ms_ = target.heartbeat_period_afterfail_ms_;
}
return *this;
}
bool BaseConfig::SetMasterAddrInfo(string& err_info, const string& master_addrinfo) {
// check parameter masterAddrInfo
string trimed_master_addr_info = Utils::Trim(master_addrinfo);
if (trimed_master_addr_info.empty()) {
err_info = "Illegal parameter: master_addrinfo is blank!";
return false;
}
if (trimed_master_addr_info.length() > tb_config::kMasterAddrInfoMaxLength) {
stringstream ss;
ss << "Illegal parameter: over max ";
ss << tb_config::kMasterAddrInfoMaxLength;
ss << " length of master_addrinfo parameter!";
err_info = ss.str();
return false;
}
// parse and verify master address info
// master_addrinfo's format like ip1:port1,ip2:port2,ip3:port3
map<string, int32_t> tgt_address_map;
Utils::Split(master_addrinfo, tgt_address_map, delimiter::kDelimiterComma,
delimiter::kDelimiterColon);
if (tgt_address_map.empty()) {
err_info = "Illegal parameter: unrecognized master_addrinfo address information!";
return false;
}
master_addrinfo_ = trimed_master_addr_info;
err_info = "Ok";
return true;
}
bool BaseConfig::SetTlsInfo(string& err_info, bool tls_enable, const string& trust_store_path,
const string& trust_store_password) {
tls_enabled_ = tls_enable;
if (tls_enable) {
string trimed_trust_store_path = Utils::Trim(trust_store_path);
if (trimed_trust_store_path.empty()) {
err_info = "Illegal parameter: trust_store_path is empty!";
return false;
}
string trimed_trust_store_password = Utils::Trim(trust_store_password);
if (trimed_trust_store_password.empty()) {
err_info = "Illegal parameter: trust_store_password is empty!";
return false;
}
tls_trust_store_path_ = trimed_trust_store_path;
tls_trust_store_password_ = trimed_trust_store_password;
} else {
tls_trust_store_path_ = "";
tls_trust_store_password_ = "";
}
err_info = "Ok";
return true;
}
bool BaseConfig::SetAuthenticInfo(string& err_info, bool authentic_enable, const string& usr_name,
const string& usr_password) {
auth_enable_ = authentic_enable;
if (authentic_enable) {
string trimed_usr_name = Utils::Trim(usr_name);
if (trimed_usr_name.empty()) {
err_info = "Illegal parameter: usr_name is empty!";
return false;
}
string trimed_usr_password = Utils::Trim(usr_password);
if (trimed_usr_password.empty()) {
err_info = "Illegal parameter: usr_password is empty!";
return false;
}
auth_usrname_ = trimed_usr_name;
auth_usrpassword_ = trimed_usr_password;
} else {
auth_usrname_ = "";
auth_usrpassword_ = "";
}
err_info = "Ok";
return true;
}
const string& BaseConfig::GetMasterAddrInfo() const { return master_addrinfo_; }
bool BaseConfig::IsTlsEnabled() { return tls_enabled_; }
const string& BaseConfig::GetTrustStorePath() const { return tls_trust_store_path_; }
const string& BaseConfig::GetTrustStorePassword() const { return tls_trust_store_password_; }
bool BaseConfig::IsAuthenticEnabled() { return auth_enable_; }
const string& BaseConfig::GetUsrName() const { return auth_usrname_; }
const string& BaseConfig::GetUsrPassWord() const { return auth_usrpassword_; }
void BaseConfig::SetRpcReadTimeoutMs(int rpc_read_timeout_ms) {
if (rpc_read_timeout_ms >= tb_config::kRpcTimoutMaxMs) {
rpc_read_timeout_ms_ = tb_config::kRpcTimoutMaxMs;
} else if (rpc_read_timeout_ms <= tb_config::kRpcTimoutMinMs) {
rpc_read_timeout_ms_ = tb_config::kRpcTimoutMinMs;
} else {
rpc_read_timeout_ms_ = rpc_read_timeout_ms;
}
}
int32_t BaseConfig::GetRpcReadTimeoutMs() { return rpc_read_timeout_ms_; }
void BaseConfig::SetHeartbeatPeriodMs(int32_t heartbeat_period_ms) {
heartbeat_period_ms_ = heartbeat_period_ms;
}
int32_t BaseConfig::GetHeartbeatPeriodMs() { return heartbeat_period_ms_; }
void BaseConfig::SetMaxHeartBeatRetryTimes(int32_t max_heartbeat_retry_times) {
max_heartbeat_retry_times_ = max_heartbeat_retry_times;
}
int32_t BaseConfig::GetMaxHeartBeatRetryTimes() { return max_heartbeat_retry_times_; }
void BaseConfig::SetHeartbeatPeriodAftFailMs(int32_t heartbeat_period_afterfail_ms) {
heartbeat_period_afterfail_ms_ = heartbeat_period_afterfail_ms;
}
int32_t BaseConfig::GetHeartbeatPeriodAftFailMs() { return heartbeat_period_afterfail_ms_; }
const string BaseConfig::ToString() const {
stringstream ss;
ss << "BaseConfig={master_addrinfo_='";
ss << master_addrinfo_;
ss << "', authEnable=";
ss << auth_enable_;
ss << ", auth_usrname_='";
ss << auth_usrname_;
ss << "', auth_usrpassword_='";
ss << auth_usrpassword_;
ss << "', tls_enabled_=";
ss << tls_enabled_;
ss << ", tls_trust_store_path_='";
ss << tls_trust_store_path_;
ss << "', tls_trust_store_password_='";
ss << tls_trust_store_password_;
ss << "', rpc_read_timeout_ms_=";
ss << rpc_read_timeout_ms_;
ss << ", heartbeat_period_ms_=";
ss << heartbeat_period_ms_;
ss << ", max_heartbeat_retry_times_=";
ss << max_heartbeat_retry_times_;
ss << ", heartbeat_period_afterfail_ms_=";
ss << heartbeat_period_afterfail_ms_;
ss << "}";
return ss.str();
}
ConsumerConfig::ConsumerConfig() {
group_name_ = "";
is_bound_consume_ = false;
session_key_ = "";
source_count_ = 0;
is_select_big_ = true;
consume_position_ = kConsumeFromLatestOffset;
is_rollback_if_confirm_timout_ = true;
max_subinfo_report_intvl_ = tb_config::kSubInfoReportMaxIntervalTimes;
max_part_check_period_ms_ = tb_config::kMaxPartCheckPeriodMsDef;
part_check_slice_ms_ = tb_config::kPartCheckSliceMsDef;
msg_notfound_wait_period_ms_ = tb_config::kMsgNotfoundWaitPeriodMsDef;
reb_confirm_wait_period_ms_ = tb_config::kRebConfirmWaitPeriodMsDef;
max_confirm_wait_period_ms_ = tb_config::kConfirmWaitPeriodMsMax;
shutdown_reb_wait_period_ms_ = tb_config::kRebWaitPeriodWhenShutdownMs;
}
ConsumerConfig::~ConsumerConfig() {
//
}
ConsumerConfig& ConsumerConfig::operator=(const ConsumerConfig& target) {
if (this != &target) {
// parent class
BaseConfig::operator=(target);
// child class
group_name_ = target.group_name_;
sub_topic_and_filter_map_ = target.sub_topic_and_filter_map_;
is_bound_consume_ = target.is_bound_consume_;
session_key_ = target.session_key_;
source_count_ = target.source_count_;
is_select_big_ = target.is_select_big_;
part_offset_map_ = target.part_offset_map_;
consume_position_ = target.consume_position_;
max_subinfo_report_intvl_ = target.max_subinfo_report_intvl_;
max_part_check_period_ms_ = target.max_part_check_period_ms_;
part_check_slice_ms_ = target.part_check_slice_ms_;
msg_notfound_wait_period_ms_ = target.msg_notfound_wait_period_ms_;
is_rollback_if_confirm_timout_ = target.is_rollback_if_confirm_timout_;
reb_confirm_wait_period_ms_ = target.reb_confirm_wait_period_ms_;
max_confirm_wait_period_ms_ = target.max_confirm_wait_period_ms_;
shutdown_reb_wait_period_ms_ = target.shutdown_reb_wait_period_ms_;
}
return *this;
}
bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, const string& group_name,
const set<string>& subscribed_topicset) {
string tgt_group_name;
bool is_success = Utils::ValidGroupName(err_info, group_name, tgt_group_name);
if (!is_success) {
return false;
}
if (subscribed_topicset.empty()) {
err_info = "Illegal parameter: subscribed_topicset is empty!";
return false;
}
string topic_name;
map<string, set<string> > tmp_sub_map;
for (set<string>::iterator it = subscribed_topicset.begin(); it != subscribed_topicset.end();
++it) {
topic_name = Utils::Trim(*it);
is_success =
Utils::ValidString(err_info, topic_name, false, true, true, tb_config::kTopicNameMaxLength);
if (!is_success) {
err_info = "Illegal parameter: subscribed_topicset's item error, " + err_info;
return false;
}
set<string> tmp_filters;
tmp_sub_map[topic_name] = tmp_filters;
}
is_bound_consume_ = false;
group_name_ = tgt_group_name;
sub_topic_and_filter_map_ = tmp_sub_map;
err_info = "Ok";
return true;
}
bool ConsumerConfig::SetGroupConsumeTarget(
string& err_info, const string& group_name,
const map<string, set<string> >& subscribed_topic_and_filter_map) {
string session_key;
int source_count = 0;
bool is_select_big = false;
map<string, int64_t> part_offset_map;
return setGroupConsumeTarget(err_info, false, group_name, subscribed_topic_and_filter_map,
session_key, source_count, is_select_big, part_offset_map);
}
bool ConsumerConfig::SetGroupConsumeTarget(
string& err_info, const string& group_name,
const map<string, set<string> >& subscribed_topic_and_filter_map, const string& session_key,
uint32_t source_count, bool is_select_big, const map<string, int64_t>& part_offset_map) {
return setGroupConsumeTarget(err_info, true, group_name, subscribed_topic_and_filter_map,
session_key, source_count, is_select_big, part_offset_map);
}
bool ConsumerConfig::setGroupConsumeTarget(
string& err_info, bool is_bound_consume, const string& group_name,
const map<string, set<string> >& subscribed_topic_and_filter_map, const string& session_key,
uint32_t source_count, bool is_select_big, const map<string, int64_t>& part_offset_map) {
// check parameter group_name
string tgt_group_name;
bool is_success = Utils::ValidGroupName(err_info, group_name, tgt_group_name);
if (!is_success) {
return false;
}
// check parameter subscribed_topic_and_filter_map
if (subscribed_topic_and_filter_map.empty()) {
err_info = "Illegal parameter: subscribed_topic_and_filter_map is empty!";
return false;
}
map<string, set<string> > tmp_sub_map;
map<string, set<string> >::const_iterator it_map;
for (it_map = subscribed_topic_and_filter_map.begin();
it_map != subscribed_topic_and_filter_map.end(); ++it_map) {
uint32_t count = 0;
string tmp_filteritem;
set<string> tgt_filters;
// check topic_name info
is_success = Utils::ValidString(err_info, it_map->first, false, true, true,
tb_config::kTopicNameMaxLength);
if (!is_success) {
stringstream ss;
ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
ss << it_map->first;
ss << " ";
ss << err_info;
err_info = ss.str();
return false;
}
string topic_name = Utils::Trim(it_map->first);
// check filter info
set<string> subscribed_filters = it_map->second;
for (set<string>::iterator it = subscribed_filters.begin(); it != subscribed_filters.end();
++it) {
is_success = Utils::ValidFilterItem(err_info, *it, tmp_filteritem);
if (!is_success) {
stringstream ss;
ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
ss << topic_name;
ss << "'s filter item ";
ss << err_info;
err_info = ss.str();
return false;
}
tgt_filters.insert(tmp_filteritem);
count++;
}
if (count > tb_config::kFilterItemMaxCount) {
stringstream ss;
ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
ss << it_map->first;
ss << "'s filter item over max item count : ";
ss << tb_config::kFilterItemMaxCount;
err_info = ss.str();
return false;
}
tmp_sub_map[topic_name] = tgt_filters;
}
// check if bound consume
if (!is_bound_consume) {
is_bound_consume_ = false;
group_name_ = tgt_group_name;
sub_topic_and_filter_map_ = tmp_sub_map;
err_info = "Ok";
return true;
}
// check session_key
string tgt_session_key = Utils::Trim(session_key);
if (tgt_session_key.length() == 0
|| tgt_session_key.length() > tb_config::kSessionKeyMaxLength) {
if (tgt_session_key.length() == 0) {
err_info = "Illegal parameter: session_key is empty!";
} else {
stringstream ss;
ss << "Illegal parameter: session_key's length over max length ";
ss << tb_config::kSessionKeyMaxLength;
err_info = ss.str();
}
return false;
}
// check part_offset_map
string part_key;
map<string, int64_t> tmp_parts_map;
map<string, int64_t>::const_iterator it_part;
for (it_part = part_offset_map.begin(); it_part != part_offset_map.end(); ++it_part) {
vector<string> result;
Utils::Split(it_part->first, result, delimiter::kDelimiterColon);
if (result.size() != 3) {
stringstream ss;
ss << "Illegal parameter: part_offset_map's key ";
ss << it_part->first;
ss << " format error, value must be aaaa:bbbb:cccc !";
err_info = ss.str();
return false;
}
if (tmp_sub_map.find(result[1]) == tmp_sub_map.end()) {
stringstream ss;
ss << "Illegal parameter: ";
ss << it_part->first;
ss << " subscribed topic ";
ss << result[1];
ss << " not included in subscribed_topic_and_filter_map's topic list!";
err_info = ss.str();
return false;
}
if (it_part->first.find_first_of(delimiter::kDelimiterComma) != string::npos) {
stringstream ss;
ss << "Illegal parameter: key ";
ss << it_part->first;
ss << " include ";
ss << delimiter::kDelimiterComma;
ss << " char!";
err_info = ss.str();
return false;
}
if (it_part->second < 0) {
stringstream ss;
ss << "Illegal parameter: ";
ss << it_part->first;
ss << "'s offset must over or equal zero, value is ";
ss << it_part->second;
err_info = ss.str();
return false;
}
Utils::Join(result, delimiter::kDelimiterColon, part_key);
tmp_parts_map[part_key] = it_part->second;
}
// set verified data
is_bound_consume_ = true;
group_name_ = tgt_group_name;
sub_topic_and_filter_map_ = tmp_sub_map;
session_key_ = tgt_session_key;
source_count_ = source_count;
is_select_big_ = is_select_big;
part_offset_map_ = tmp_parts_map;
err_info = "Ok";
return true;
}
const string& ConsumerConfig::GetGroupName() const { return group_name_; }
const map<string, set<string> >& ConsumerConfig::GetSubTopicAndFilterMap() const {
return sub_topic_and_filter_map_;
}
void ConsumerConfig::SetConsumePosition(ConsumePosition consume_from_where) {
consume_position_ = consume_from_where;
}
const ConsumePosition ConsumerConfig::GetConsumePosition() const { return consume_position_; }
const int32_t ConsumerConfig::GetMsgNotFoundWaitPeriodMs() const {
return msg_notfound_wait_period_ms_;
}
void ConsumerConfig::SetMsgNotFoundWaitPeriodMs(int32_t msg_notfound_wait_period_ms) {
msg_notfound_wait_period_ms_ = msg_notfound_wait_period_ms;
}
const uint32_t ConsumerConfig::GetPartCheckSliceMs() const {
return part_check_slice_ms_;
}
void ConsumerConfig::SetPartCheckSliceMs(uint32_t part_check_slice_ms) {
if (part_check_slice_ms >= 0
&& part_check_slice_ms <= 1000) {
part_check_slice_ms_ = part_check_slice_ms;
}
}
const int32_t ConsumerConfig::GetMaxPartCheckPeriodMs() const {
return max_part_check_period_ms_;
}
void ConsumerConfig::SetMaxPartCheckPeriodMs(int32_t max_part_check_period_ms) {
max_part_check_period_ms_ = max_part_check_period_ms;
}
const int32_t ConsumerConfig::GetMaxSubinfoReportIntvl() const {
return max_subinfo_report_intvl_;
}
void ConsumerConfig::SetMaxSubinfoReportIntvl(int32_t max_subinfo_report_intvl) {
max_subinfo_report_intvl_ = max_subinfo_report_intvl;
}
bool ConsumerConfig::IsRollbackIfConfirmTimeout() {
return is_rollback_if_confirm_timout_; }
void ConsumerConfig::setRollbackIfConfirmTimeout(
bool is_rollback_if_confirm_timeout) {
is_rollback_if_confirm_timout_ = is_rollback_if_confirm_timeout;
}
const int ConsumerConfig::GetWaitPeriodIfConfirmWaitRebalanceMs() const {
return reb_confirm_wait_period_ms_;
}
void ConsumerConfig::SetWaitPeriodIfConfirmWaitRebalanceMs(
int32_t reb_confirm_wait_period_ms) {
reb_confirm_wait_period_ms_ = reb_confirm_wait_period_ms;
}
const int32_t ConsumerConfig::GetMaxConfirmWaitPeriodMs() const {
return max_confirm_wait_period_ms_; }
void ConsumerConfig::SetMaxConfirmWaitPeriodMs(
int32_t max_confirm_wait_period_ms) {
max_confirm_wait_period_ms_ = max_confirm_wait_period_ms;
}
const int32_t ConsumerConfig::GetShutdownRebWaitPeriodMs() const {
return shutdown_reb_wait_period_ms_;
}
void ConsumerConfig::SetShutdownRebWaitPeriodMs(
int32_t wait_period_when_shutdown_ms) {
shutdown_reb_wait_period_ms_ = wait_period_when_shutdown_ms;
}
const string ConsumerConfig::ToString() const {
int32_t i = 0;
stringstream ss;
map<string, int64_t>::const_iterator it;
map<string, set<string> >::const_iterator it_map;
// print info
ss << "ConsumerConfig = {";
ss << BaseConfig::ToString();
ss << ", group_name_='";
ss << group_name_;
ss << "', sub_topic_and_filter_map_={";
for (it_map = sub_topic_and_filter_map_.begin();
it_map != sub_topic_and_filter_map_.end(); ++it_map) {
if (i++ > 0) {
ss << ",";
}
ss << "'";
ss << it_map->first;
ss << "'=[";
int32_t j = 0;
set<string> topic_set = it_map->second;
for (set<string>::const_iterator it = topic_set.begin(); it != topic_set.end(); ++it) {
if (j++ > 0) {
ss << ",";
}
ss << "'";
ss << *it;
ss << "'";
}
ss << "]";
}
ss << "}, is_bound_consume_=";
ss << is_bound_consume_;
ss << ", session_key_='";
ss << session_key_;
ss << "', source_count_=";
ss << source_count_;
ss << ", is_select_big_=";
ss << is_select_big_;
ss << ", part_offset_map_={";
i = 0;
for (it = part_offset_map_.begin(); it != part_offset_map_.end(); ++it) {
if (i++ > 0) {
ss << ",";
}
ss << "'";
ss << it->first;
ss << "'=";
ss << it->second;
}
ss << "}, consume_position_=";
ss << consume_position_;
ss << ", max_subinfo_report_intvl_=";
ss << max_subinfo_report_intvl_;
ss << ", msg_notfound_wait_period_ms_=";
ss << msg_notfound_wait_period_ms_;
ss << ", max_part_check_period_ms_=";
ss << max_part_check_period_ms_;
ss << ", part_check_slice_ms_=";
ss << part_check_slice_ms_;
ss << ", is_rollback_if_confirm_timout_=";
ss << is_rollback_if_confirm_timout_;
ss << ", reb_confirm_wait_period_ms_=";
ss << reb_confirm_wait_period_ms_;
ss << ", max_confirm_wait_period_ms_=";
ss << max_confirm_wait_period_ms_;
ss << ", shutdown_reb_wait_period_ms_=";
ss << shutdown_reb_wait_period_ms_;
ss << "}";
return ss.str();
}
} // namespace tubemq