blob: 8eb3b1351d9253014f60608e3897f33e2f281de8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MQClient.h"
#include "Logging.h"
#include "MQClientFactory.h"
#include "MQClientManager.h"
#include "TopicPublishInfo.h"
#include "UtilAll.h"
#include "NameSpaceUtil.h"
namespace rocketmq {
#define ROCKETMQCPP_VERSION "1.2.4"
#define BUILD_DATE "11-11-2019"
// display version: strings bin/librocketmq.so |grep VERSION
const char* rocketmq_build_time = "VERSION: " ROCKETMQCPP_VERSION ", BUILD DATE: " BUILD_DATE " ";
//<!************************************************************************
MQClient::MQClient() {
string NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
if (const char* addr = getenv(NAMESRV_ADDR_ENV.c_str()))
m_namesrvAddr = addr;
else
m_namesrvAddr = "";
m_instanceName = "DEFAULT";
m_clientFactory = NULL;
m_serviceState = CREATE_JUST;
m_pullThreadNum = std::thread::hardware_concurrency();
m_tcpConnectTimeout = 3000; // 3s
m_tcpTransportTryLockTimeout = 3; // 3s
m_unitName = "";
}
MQClient::~MQClient() {}
string MQClient::getMQClientId() const {
string clientIP = UtilAll::getLocalAddress();
string processId = UtilAll::to_string(getpid());
return processId + "-" + clientIP + "@" + m_instanceName;
}
//<!groupName;
const string& MQClient::getGroupName() const {
return m_GroupName;
}
void MQClient::setGroupName(const string& groupname) {
m_GroupName = groupname;
}
const string& MQClient::getNamesrvAddr() const {
return m_namesrvAddr;
}
void MQClient::setNamesrvAddr(const string& namesrvAddr) {
m_namesrvAddr = NameSpaceUtil::formatNameServerURL(namesrvAddr);
}
const string& MQClient::getNamesrvDomain() const {
return m_namesrvDomain;
}
void MQClient::setNamesrvDomain(const string& namesrvDomain) {
m_namesrvDomain = namesrvDomain;
}
const string& MQClient::getInstanceName() const {
return m_instanceName;
}
void MQClient::setInstanceName(const string& instanceName) {
m_instanceName = instanceName;
}
void MQClient::createTopic(const string& key, const string& newTopic, int queueNum) {
try {
getFactory()->createTopic(key, newTopic, queueNum, m_SessionCredentials);
} catch (MQException& e) {
LOG_ERROR(e.what());
}
}
int64 MQClient::earliestMsgStoreTime(const MQMessageQueue& mq) {
return getFactory()->earliestMsgStoreTime(mq, m_SessionCredentials);
}
QueryResult MQClient::queryMessage(const string& topic, const string& key, int maxNum, int64 begin, int64 end) {
return getFactory()->queryMessage(topic, key, maxNum, begin, end, m_SessionCredentials);
}
int64 MQClient::minOffset(const MQMessageQueue& mq) {
return getFactory()->minOffset(mq, m_SessionCredentials);
}
int64 MQClient::maxOffset(const MQMessageQueue& mq) {
return getFactory()->maxOffset(mq, m_SessionCredentials);
}
int64 MQClient::searchOffset(const MQMessageQueue& mq, uint64_t timestamp) {
return getFactory()->searchOffset(mq, timestamp, m_SessionCredentials);
}
MQMessageExt* MQClient::viewMessage(const string& msgId) {
return getFactory()->viewMessage(msgId, m_SessionCredentials);
}
vector<MQMessageQueue> MQClient::getTopicMessageQueueInfo(const string& topic) {
boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
getFactory()->tryToFindTopicPublishInfo(topic, m_SessionCredentials));
boost::shared_ptr<TopicPublishInfo> topicPublishInfo(weak_topicPublishInfo.lock());
if (topicPublishInfo) {
return topicPublishInfo->getMessageQueueList();
}
THROW_MQEXCEPTION(MQClientException, "could not find MessageQueue Info of topic: [" + topic + "].", -1);
}
void MQClient::start() {
if (getFactory() == NULL) {
m_clientFactory = MQClientManager::getInstance()->getMQClientFactory(
getMQClientId(), m_pullThreadNum, m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_unitName);
}
LOG_INFO(
"MQClient "
"start,groupname:%s,clientID:%s,instanceName:%s,nameserveraddr:%s",
getGroupName().c_str(), getMQClientId().c_str(), getInstanceName().c_str(), getNamesrvAddr().c_str());
}
void MQClient::shutdown() {
m_clientFactory = NULL;
}
MQClientFactory* MQClient::getFactory() const {
return m_clientFactory;
}
bool MQClient::isServiceStateOk() {
return m_serviceState == RUNNING;
}
void MQClient::setLogLevel(elogLevel inputLevel) {
ALOG_ADAPTER->setLogLevel(inputLevel);
}
elogLevel MQClient::getLogLevel() {
return ALOG_ADAPTER->getLogLevel();
}
void MQClient::setLogFileSizeAndNum(int fileNum, long perFileSize) {
ALOG_ADAPTER->setLogFileNumAndSize(fileNum, perFileSize);
}
void MQClient::setTcpTransportPullThreadNum(int num) {
if (num > m_pullThreadNum) {
m_pullThreadNum = num;
}
}
const int MQClient::getTcpTransportPullThreadNum() const {
return m_pullThreadNum;
}
void MQClient::setTcpTransportConnectTimeout(uint64_t timeout) {
m_tcpConnectTimeout = timeout;
}
const uint64_t MQClient::getTcpTransportConnectTimeout() const {
return m_tcpConnectTimeout;
}
void MQClient::setTcpTransportTryLockTimeout(uint64_t timeout) {
if (timeout < 1000) {
timeout = 1000;
}
m_tcpTransportTryLockTimeout = timeout / 1000;
}
const uint64_t MQClient::getTcpTransportTryLockTimeout() const {
return m_tcpTransportTryLockTimeout;
}
void MQClient::setUnitName(string unitName) {
m_unitName = unitName;
}
const string& MQClient::getUnitName() {
return m_unitName;
}
void MQClient::setSessionCredentials(const string& input_accessKey,
const string& input_secretKey,
const string& input_onsChannel) {
m_SessionCredentials.setAccessKey(input_accessKey);
m_SessionCredentials.setSecretKey(input_secretKey);
m_SessionCredentials.setAuthChannel(input_onsChannel);
}
const SessionCredentials& MQClient::getSessionCredentials() const {
return m_SessionCredentials;
}
//<!************************************************************************
} //<!end namespace;