feat(version): add api to get SDK versions (#261)
* feat(version): add api to get SDK versions
* chore(style): code format for versions get.
* test(unittest): add test case for version get
* test(unittest): add test case for version get
* chore(style): format sdk version style
* chore(style): format sdk version style
* feat(config): show the current config when client start
* feat(config): show the current config when client start
diff --git a/include/CCommon.h b/include/CCommon.h
index 0fbcbda..fa6edb9 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -25,6 +25,8 @@
#define MAX_MESSAGE_ID_LENGTH 256
#define MAX_TOPIC_LENGTH 512
#define MAX_BROKER_NAME_ID_LENGTH 256
+#define MAX_SDK_VERSION_LENGTH 256
+#define DEFAULT_SDK_VERSION "DefaultVersion"
typedef enum _CStatus_ {
// Success
OK = 0,
diff --git a/include/CProducer.h b/include/CProducer.h
index 09b23ba..fa6a730 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -47,6 +47,7 @@
ROCKETMQCLIENT_API int DestroyProducer(CProducer* producer);
ROCKETMQCLIENT_API int StartProducer(CProducer* producer);
ROCKETMQCLIENT_API int ShutdownProducer(CProducer* producer);
+ROCKETMQCLIENT_API const char* ShowProducerVersion(CProducer* producer);
ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer* producer, const char* namesrv);
ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer* producer, const char* domain);
diff --git a/include/CPullConsumer.h b/include/CPullConsumer.h
index ec96005..60a3367 100644
--- a/include/CPullConsumer.h
+++ b/include/CPullConsumer.h
@@ -33,6 +33,8 @@
ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer* consumer);
ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer* consumer);
ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer* consumer);
+ROCKETMQCLIENT_API const char* ShowPullConsumerVersion(CPullConsumer* consumer);
+
ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId);
ROCKETMQCLIENT_API const char* GetPullConsumerGroupID(CPullConsumer* consumer);
ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer* consumer, const char* namesrv);
diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h
index 4880d0f..e2bce18 100644
--- a/include/CPushConsumer.h
+++ b/include/CPushConsumer.h
@@ -36,6 +36,7 @@
ROCKETMQCLIENT_API int DestroyPushConsumer(CPushConsumer* consumer);
ROCKETMQCLIENT_API int StartPushConsumer(CPushConsumer* consumer);
ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer* consumer);
+ROCKETMQCLIENT_API const char* ShowPushConsumerVersion(CPushConsumer* consumer);
ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer* consumer, const char* groupId);
ROCKETMQCLIENT_API const char* GetPushConsumerGroupID(CPushConsumer* consumer);
ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer* consumer, const char* namesrv);
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 4614244..991d27d 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -35,6 +35,7 @@
virtual void start();
virtual void shutdown();
+ virtual std::string version();
virtual SendResult send(MQMessage& msg, bool bSelectActiveBroker = false);
virtual SendResult send(MQMessage& msg, const MQMessageQueue& mq);
diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 203578f..6ed6c62 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -41,6 +41,7 @@
virtual void start();
virtual void shutdown();
+ virtual std::string version();
const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index 2b8f672..5b10b3d 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -36,6 +36,7 @@
virtual void start();
virtual void shutdown();
+ virtual std::string version();
const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h
index 9de728a..f4c0281 100644
--- a/include/TransactionMQProducer.h
+++ b/include/TransactionMQProducer.h
@@ -36,6 +36,7 @@
void start();
void shutdown();
+ std::string version();
const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index fdbdb4a..f64818a 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -1179,6 +1179,8 @@
runningInfo->setProperty(
ConsumerRunningInfo::PROP_CLIENT_VERSION,
MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)); // MQVersion::s_CurrentVersion ));
+ runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_SDK_VERSION,
+ pConsumer->getClientVersionString()); // in DefaultMQClient.cpp;
return runningInfo;
}
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index d36897d..51f554a 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -143,7 +143,6 @@
void doRebalance();
void timerCB_doRebalance(boost::system::error_code& ec, boost::shared_ptr<boost::asio::deadline_timer> t);
bool getSessionCredentialFromConsumerTable(SessionCredentials& sessionCredentials);
- bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer);
void eraseConsumerFromTable(const string& consumerName);
int getConsumerTableSize();
void getTopicListFromConsumerSubscription(set<string>& topicList);
@@ -152,7 +151,6 @@
// producer related operation
bool getSessionCredentialFromProducerTable(SessionCredentials& sessionCredentials);
- bool addProducerToTable(const string& producerName, MQProducer* pMQProducer);
void eraseProducerFromTable(const string& producerName);
int getProducerTableSize();
void insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
@@ -171,6 +169,9 @@
unique_ptr<MQClientAPIImpl> m_pClientAPIImpl;
unique_ptr<ClientRemotingProcessor> m_pClientRemotingProcessor;
+ bool addProducerToTable(const string& producerName, MQProducer* pMQProducer);
+ bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer);
+
private:
string m_nameSrvDomain; // per clientId
ServiceState m_serviceState;
diff --git a/src/common/DefaultMQClient.cpp b/src/common/DefaultMQClient.cpp
index f468b7f..39013fc 100644
--- a/src/common/DefaultMQClient.cpp
+++ b/src/common/DefaultMQClient.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "include/DefaultMQClient.h"
+#include "DefaultMQClient.h"
#include "Logging.h"
#include "MQClientFactory.h"
#include "MQClientManager.h"
@@ -24,11 +24,11 @@
#include "UtilAll.h"
namespace rocketmq {
-
+// hard code first.
#define ROCKETMQCPP_VERSION "2.0.0"
-#define BUILD_DATE "02-14-2020"
+#define BUILD_DATE "22:50:18 02-14-2020"
// display version: strings bin/librocketmq.so |grep VERSION
-const char* rocketmq_build_time = "VERSION: " ROCKETMQCPP_VERSION ", BUILD DATE: " BUILD_DATE " ";
+const char* rocketmq_build_time = "CPP CORE VERSION: " ROCKETMQCPP_VERSION ", BUILD TIME: " BUILD_DATE;
//<!************************************************************************
DefaultMQClient::DefaultMQClient() {
@@ -56,6 +56,11 @@
// return processId + "-" + clientIP + "@" + m_instanceName;
return clientIP + "@" + processId + "#" + m_instanceName;
}
+// version
+string DefaultMQClient::getClientVersionString() const {
+ string version(rocketmq_build_time);
+ return version;
+}
//<!groupName;
const string& DefaultMQClient::getGroupName() const {
@@ -222,6 +227,19 @@
const SessionCredentials& DefaultMQClient::getSessionCredentials() const {
return m_SessionCredentials;
}
-
+void DefaultMQClient::showClientConfigs() {
+ // LOG_WARN("*****************************************************************************");
+ LOG_WARN("ClientID:%s", getMQClientId().c_str());
+ LOG_WARN("GroupName:%s", m_GroupName.c_str());
+ LOG_WARN("NameServer:%s", m_namesrvAddr.c_str());
+ LOG_WARN("NameServerDomain:%s", m_namesrvDomain.c_str());
+ LOG_WARN("NameSpace:%s", m_nameSpace.c_str());
+ LOG_WARN("InstanceName:%s", m_instanceName.c_str());
+ LOG_WARN("UnitName:%s", m_unitName.c_str());
+ LOG_WARN("PullThreadNum:%d", m_pullThreadNum);
+ LOG_WARN("TcpConnectTimeout:%lld ms", m_tcpConnectTimeout);
+ LOG_WARN("TcpTransportTryLockTimeout:%lld s", m_tcpTransportTryLockTimeout);
+ // LOG_WARN("*****************************************************************************");
+}
//<!************************************************************************
} // namespace rocketmq
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 0548807..363c5cc 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -17,6 +17,7 @@
#include "DefaultMQPullConsumer.h"
#include "DefaultMQPullConsumerImpl.h"
+#include "MQVersion.h"
namespace rocketmq {
@@ -34,7 +35,14 @@
void DefaultMQPullConsumer::shutdown() {
impl->shutdown();
}
-
+std::string DefaultMQPullConsumer::version() {
+ std::string versions = impl->getClientVersionString();
+ /*versions.append(", PROTOCOL VERSION: ")
+ .append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion))
+ .append(", LANGUAGE: ")
+ .append(MQVersion::s_CurrentLanguage);*/
+ return versions;
+}
// start mqclient set
const std::string& DefaultMQPullConsumer::getNamesrvAddr() const {
return impl->getNamesrvAddr();
diff --git a/src/consumer/DefaultMQPullConsumerImpl.cpp b/src/consumer/DefaultMQPullConsumerImpl.cpp
index 6408842..cd49548 100644
--- a/src/consumer/DefaultMQPullConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPullConsumerImpl.cpp
@@ -63,7 +63,9 @@
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
+ LOG_INFO("###Current Pull Consumer@%s", getClientVersionString().c_str());
dealWithNameSpace();
+ showClientConfigs();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index dc02d9d..5034051 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -16,6 +16,7 @@
*/
#include "DefaultMQPushConsumer.h"
+#include <MQVersion.h>
#include "DefaultMQPushConsumerImpl.h"
namespace rocketmq {
@@ -34,7 +35,14 @@
void DefaultMQPushConsumer::shutdown() {
impl->shutdown();
}
-
+std::string DefaultMQPushConsumer::version() {
+ std::string versions = impl->getClientVersionString();
+ /*versions.append(", PROTOCOL VERSION: ")
+ .append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion))
+ .append(", LANGUAGE: ")
+ .append(MQVersion::s_CurrentLanguage);*/
+ return versions;
+}
// ConsumeType DefaultMQPushConsumer::getConsumeType() {
// return impl->getConsumeType();
//}
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 54500bc..5fe6f9c 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -193,6 +193,7 @@
//<!***************************************************************************
static boost::mutex m_asyncCallbackLock;
+DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl() {}
DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl(const string& groupname)
: m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
m_pOffsetStore(NULL),
@@ -307,8 +308,10 @@
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
+ LOG_WARN("###Current Push Consumer@%s", getClientVersionString().c_str());
// deal with name space before start
dealWithNameSpace();
+ logConfigs();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
@@ -1038,5 +1041,44 @@
return true;
}
+void DefaultMQPushConsumerImpl::logConfigs() {
+ showClientConfigs();
+
+ LOG_WARN("MessageModel:%d", m_messageModel);
+ LOG_WARN("MessageModel:%s", m_messageModel == BROADCASTING ? "BROADCASTING" : "CLUSTERING");
+
+ LOG_WARN("ConsumeFromWhere:%d", m_consumeFromWhere);
+ switch (m_consumeFromWhere) {
+ case CONSUME_FROM_FIRST_OFFSET:
+ LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_FIRST_OFFSET");
+ break;
+ case CONSUME_FROM_LAST_OFFSET:
+ LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_LAST_OFFSET");
+ break;
+
+ case CONSUME_FROM_TIMESTAMP:
+ LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_TIMESTAMP");
+ break;
+ case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
+ LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST");
+ break;
+ case CONSUME_FROM_MAX_OFFSET:
+ LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_MAX_OFFSET");
+ break;
+ case CONSUME_FROM_MIN_OFFSET:
+ LOG_WARN("ConsumeFromWhere:%s", "CONSUME_FROM_MAX_OFFSET");
+ break;
+ default:
+ LOG_WARN("ConsumeFromWhere:%s", "UnKnown.");
+ break;
+ }
+ LOG_WARN("ConsumeThreadCount:%d", m_consumeThreadCount);
+ LOG_WARN("ConsumeMessageBatchMaxSize:%d", m_consumeMessageBatchMaxSize);
+ LOG_WARN("MaxMsgCacheSizePerQueue:%d", m_maxMsgCacheSize);
+ LOG_WARN("MaxReconsumeTimes:%d", m_maxReconsumeTimes);
+ LOG_WARN("PullMsgThreadPoolNum:%d", m_pullMsgThreadPoolNum);
+ LOG_WARN("AsyncPullMode:%s", m_asyncPull ? "true" : "false");
+ LOG_WARN("AsyncPullTimeout:%d ms", m_asyncPullTimeout);
+}
//<!************************************************************************
} // namespace rocketmq
diff --git a/src/consumer/DefaultMQPushConsumerImpl.h b/src/consumer/DefaultMQPushConsumerImpl.h
index f90b6cd..f4e4319 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.h
+++ b/src/consumer/DefaultMQPushConsumerImpl.h
@@ -45,6 +45,7 @@
//<!***************************************************************************
class DefaultMQPushConsumerImpl : public MQConsumer {
public:
+ DefaultMQPushConsumerImpl();
DefaultMQPushConsumerImpl(const std::string& groupname);
void boost_asio_work();
virtual ~DefaultMQPushConsumerImpl();
@@ -133,6 +134,7 @@
void copySubscription();
void updateTopicSubscribeInfoWhenSubscriptionChanged();
bool dealWithNameSpace();
+ void logConfigs();
private:
uint64_t m_startTime;
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 5310df8..00a9b05 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -216,6 +216,7 @@
TransactionMQProducer* innerTransactionProducer;
LocalTransactionListenerInner* listenerInner;
int producerType;
+ char* version;
} DefaultProducer;
CProducer* CreateProducer(const char* groupId) {
if (groupId == NULL) {
@@ -224,6 +225,9 @@
DefaultProducer* defaultMQProducer = new DefaultProducer();
defaultMQProducer->producerType = CAPI_C_PRODUCER_TYPE_COMMON;
defaultMQProducer->innerProducer = new DefaultMQProducer(groupId);
+ defaultMQProducer->version = new char[MAX_SDK_VERSION_LENGTH];
+ strncpy(defaultMQProducer->version, defaultMQProducer->innerProducer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1);
+ defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0;
defaultMQProducer->innerTransactionProducer = NULL;
defaultMQProducer->listenerInner = NULL;
return (CProducer*)defaultMQProducer;
@@ -236,6 +240,10 @@
DefaultProducer* defaultMQProducer = new DefaultProducer();
defaultMQProducer->producerType = CAPI_C_PRODUCER_TYPE_ORDERLY;
defaultMQProducer->innerProducer = new DefaultMQProducer(groupId);
+
+ defaultMQProducer->version = new char[MAX_SDK_VERSION_LENGTH];
+ strncpy(defaultMQProducer->version, defaultMQProducer->innerProducer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1);
+ defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0;
defaultMQProducer->innerTransactionProducer = NULL;
defaultMQProducer->listenerInner = NULL;
return (CProducer*)defaultMQProducer;
@@ -252,6 +260,11 @@
defaultMQProducer->listenerInner =
new LocalTransactionListenerInner((CProducer*)defaultMQProducer, callback, userData);
defaultMQProducer->innerTransactionProducer->setTransactionListener(defaultMQProducer->listenerInner);
+
+ defaultMQProducer->version = new char[MAX_SDK_VERSION_LENGTH];
+ strncpy(defaultMQProducer->version, defaultMQProducer->innerTransactionProducer->version().c_str(),
+ MAX_SDK_VERSION_LENGTH - 1);
+ defaultMQProducer->version[MAX_SDK_VERSION_LENGTH - 1] = 0;
return (CProducer*)defaultMQProducer;
}
int DestroyProducer(CProducer* pProducer) {
@@ -259,6 +272,10 @@
return NULL_POINTER;
}
DefaultProducer* defaultMQProducer = (DefaultProducer*)pProducer;
+ if (defaultMQProducer->version != NULL) {
+ delete defaultMQProducer->version;
+ defaultMQProducer->version = NULL;
+ }
if (CAPI_C_PRODUCER_TYPE_TRANSACTION == defaultMQProducer->producerType) {
if (defaultMQProducer->innerTransactionProducer != NULL) {
delete defaultMQProducer->innerTransactionProducer;
@@ -307,6 +324,14 @@
}
return OK;
}
+const char* ShowProducerVersion(CProducer* producer) {
+ if (producer == NULL) {
+ return DEFAULT_SDK_VERSION;
+ }
+ DefaultProducer* defaultMQProducer = (DefaultProducer*)producer;
+
+ return defaultMQProducer->version;
+}
int SetProducerNameServerAddress(CProducer* producer, const char* namesrv) {
if (producer == NULL) {
return NULL_POINTER;
diff --git a/src/extern/CPullConsumer.cpp b/src/extern/CPullConsumer.cpp
index f84a4bc..a0db242 100644
--- a/src/extern/CPullConsumer.cpp
+++ b/src/extern/CPullConsumer.cpp
@@ -27,12 +27,14 @@
#ifdef __cplusplus
extern "C" {
#endif
-
+char VERSION_FOR_PULL_CONSUMER[MAX_SDK_VERSION_LENGTH];
CPullConsumer* CreatePullConsumer(const char* groupId) {
if (groupId == NULL) {
return NULL;
}
DefaultMQPullConsumer* defaultMQPullConsumer = new DefaultMQPullConsumer(groupId);
+ strncpy(VERSION_FOR_PULL_CONSUMER, defaultMQPullConsumer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1);
+ VERSION_FOR_PULL_CONSUMER[MAX_SDK_VERSION_LENGTH - 1] = 0;
return (CPullConsumer*)defaultMQPullConsumer;
}
int DestroyPullConsumer(CPullConsumer* consumer) {
@@ -61,6 +63,13 @@
((DefaultMQPullConsumer*)consumer)->shutdown();
return OK;
}
+const char* ShowPullConsumerVersion(CPullConsumer* consumer) {
+ if (consumer == NULL) {
+ return NULL;
+ }
+ return VERSION_FOR_PULL_CONSUMER;
+}
+
int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId) {
if (consumer == NULL || groupId == NULL) {
return NULL_POINTER;
diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp
index 66ef937..5ee89ca 100644
--- a/src/extern/CPushConsumer.cpp
+++ b/src/extern/CPushConsumer.cpp
@@ -85,13 +85,16 @@
#ifdef __cplusplus
extern "C" {
#endif
-
+char VERSION_FOR_PUSH_CONSUMER[MAX_SDK_VERSION_LENGTH];
CPushConsumer* CreatePushConsumer(const char* groupId) {
if (groupId == NULL) {
return NULL;
}
DefaultMQPushConsumer* defaultMQPushConsumer = new DefaultMQPushConsumer(groupId);
defaultMQPushConsumer->setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);
+
+ strncpy(VERSION_FOR_PUSH_CONSUMER, defaultMQPushConsumer->version().c_str(), MAX_SDK_VERSION_LENGTH - 1);
+ VERSION_FOR_PUSH_CONSUMER[MAX_SDK_VERSION_LENGTH - 1] = 0;
return (CPushConsumer*)defaultMQPushConsumer;
}
int DestroyPushConsumer(CPushConsumer* consumer) {
@@ -120,6 +123,13 @@
((DefaultMQPushConsumer*)consumer)->shutdown();
return OK;
}
+
+const char* ShowPushConsumerVersion(CPushConsumer* consumer) {
+ if (consumer == NULL) {
+ return NULL;
+ }
+ return VERSION_FOR_PUSH_CONSUMER;
+}
int SetPushConsumerGroupID(CPushConsumer* consumer, const char* groupId) {
if (consumer == NULL || groupId == NULL) {
return NULL_POINTER;
diff --git a/src/include/DefaultMQClient.h b/src/include/DefaultMQClient.h
index b1b51bd..a2e5ce5 100644
--- a/src/include/DefaultMQClient.h
+++ b/src/include/DefaultMQClient.h
@@ -41,6 +41,7 @@
public:
// clientid=processId-ipAddr@instanceName;
+ std::string getClientVersionString() const;
std::string getMQClientId() const;
const std::string& getNamesrvAddr() const;
void setNamesrvAddr(const std::string& namesrvAddr);
@@ -174,6 +175,7 @@
virtual void shutdown();
MQClientFactory* getFactory() const;
virtual bool isServiceStateOk();
+ void showClientConfigs();
protected:
std::string m_namesrvAddr;
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index f656b1e..1977299 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -16,6 +16,7 @@
*/
#include "DefaultMQProducer.h"
+#include <MQVersion.h>
#include "DefaultMQProducerImpl.h"
@@ -36,6 +37,17 @@
impl->shutdown();
}
+std::string DefaultMQProducer::version() {
+ std::string versions = impl->getClientVersionString();
+ /*
+ versions.append(", PROTOCOL VERSION: ")
+ .append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion))
+ .append(", LANGUAGE: ")
+ .append(MQVersion::s_CurrentLanguage);
+ */
+ return versions;
+}
+
// start mqclient set
const std::string& DefaultMQProducer::getNamesrvAddr() const {
return impl->getNamesrvAddr();
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index aff238b..037f81a 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -63,8 +63,10 @@
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
+ LOG_WARN("###Current Producer@%s", getClientVersionString().c_str());
// we should deal with namespaced before start.
dealWithNameSpace();
+ logConfigs();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
@@ -634,5 +636,15 @@
}
return true;
}
+void DefaultMQProducerImpl::logConfigs() {
+ showClientConfigs();
+
+ LOG_WARN("SendMsgTimeout:%d ms", m_sendMsgTimeout);
+ LOG_WARN("CompressMsgBodyOverHowmuch:%d", m_compressMsgBodyOverHowmuch);
+ LOG_WARN("MaxMessageSize:%d", m_maxMessageSize);
+ LOG_WARN("CompressLevel:%d", m_compressLevel);
+ LOG_WARN("RetryTimes:%d", m_retryTimes);
+ LOG_WARN("RetryTimes4Async:%d", m_retryTimes4Async);
+}
//<!***************************************************************************
} // namespace rocketmq
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index c4d2393..65fe6af 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -103,6 +103,7 @@
bool tryToCompressMessage(MQMessage& msg);
BatchMessage buildBatchMessage(std::vector<MQMessage>& msgs);
bool dealWithNameSpace();
+ void logConfigs();
private:
int m_sendMsgTimeout;
diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
index fe2b855..c246a80 100644
--- a/src/producer/TransactionMQProducer.cpp
+++ b/src/producer/TransactionMQProducer.cpp
@@ -16,6 +16,7 @@
*/
#include "TransactionMQProducer.h"
+#include <MQVersion.h>
#include "TransactionMQProducerImpl.h"
@@ -35,7 +36,16 @@
void TransactionMQProducer::shutdown() {
impl->shutdown();
}
-
+std::string TransactionMQProducer::version() {
+ std::string versions = impl->getClientVersionString();
+ /*
+ versions.append(", PROTOCOL VERSION: ")
+ .append(MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion))
+ .append(", LANGUAGE: ")
+ .append(MQVersion::s_CurrentLanguage);
+ */
+ return versions;
+}
// start mqclient set
const std::string& TransactionMQProducer::getNamesrvAddr() const {
return impl->getNamesrvAddr();
diff --git a/src/protocol/ConsumerRunningInfo.cpp b/src/protocol/ConsumerRunningInfo.cpp
index 999686b..951cc4b 100644
--- a/src/protocol/ConsumerRunningInfo.cpp
+++ b/src/protocol/ConsumerRunningInfo.cpp
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
#include "ConsumerRunningInfo.h"
#include "UtilAll.h"
@@ -23,6 +23,7 @@
const string ConsumerRunningInfo::PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY";
const string ConsumerRunningInfo::PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
const string ConsumerRunningInfo::PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION";
+const string ConsumerRunningInfo::PROP_CLIENT_SDK_VERSION = "PROP_CLIENT_CORE_VERSION";
const string ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP";
const map<string, string> ConsumerRunningInfo::getProperties() const {
@@ -82,6 +83,7 @@
outData[PROP_CONSUMER_START_TIMESTAMP] = properties[PROP_CONSUMER_START_TIMESTAMP];
outData[PROP_CONSUME_ORDERLY] = properties[PROP_CONSUME_ORDERLY];
outData[PROP_THREADPOOL_CORE_SIZE] = properties[PROP_THREADPOOL_CORE_SIZE];
+ outData[PROP_CLIENT_SDK_VERSION] = properties[PROP_CLIENT_SDK_VERSION];
Json::Value root;
root["jstack"] = jstack;
@@ -116,4 +118,4 @@
return finals;
}
-}
+} // namespace rocketmq
diff --git a/src/protocol/ConsumerRunningInfo.h b/src/protocol/ConsumerRunningInfo.h
index 3c83834..de0d4b2 100644
--- a/src/protocol/ConsumerRunningInfo.h
+++ b/src/protocol/ConsumerRunningInfo.h
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
#ifndef __CONSUMERRUNNINGINFO_H__
#define __CONSUMERRUNNINGINFO_H__
@@ -38,6 +38,7 @@
static const string PROP_CONSUME_ORDERLY;
static const string PROP_CONSUME_TYPE;
static const string PROP_CLIENT_VERSION;
+ static const string PROP_CLIENT_SDK_VERSION;
static const string PROP_CONSUMER_START_TIMESTAMP;
public:
@@ -61,5 +62,5 @@
// map<string, ConsumeStatus> statusTable;
string jstack;
};
-}
+} // namespace rocketmq
#endif
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 4c708f7..294c319 100644
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -28,6 +28,8 @@
namespace rocketmq {
//<!************************************************************************
+TcpRemotingClient::TcpRemotingClient()
+ : m_dispatchServiceWork(m_dispatchService), m_handleServiceWork(m_handleService) {}
TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
: m_dispatchThreadNum(1),
m_pullThreadNum(pullThreadNum),
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index 66760b6..856026a 100644
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -36,6 +36,7 @@
class TcpRemotingClient {
public:
+ TcpRemotingClient();
TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout);
virtual ~TcpRemotingClient();
diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp
index 2c0687a..b334af3 100644
--- a/test/src/MQClientAPIImpTest.cpp
+++ b/test/src/MQClientAPIImpTest.cpp
@@ -35,8 +35,7 @@
class MockTcpRemotingClient : public TcpRemotingClient {
public:
- MockTcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
- : TcpRemotingClient(pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout) {}
+ MockTcpRemotingClient() : TcpRemotingClient() {}
MOCK_METHOD3(invokeSync, RemotingCommand*(const string&, RemotingCommand&, int));
MOCK_METHOD6(invokeAsync, bool(const string&, RemotingCommand&, std::shared_ptr<AsyncCallbackWrap>, int64, int, int));
@@ -49,60 +48,17 @@
uint64_t tcpConnectTimeout,
uint64_t tcpTransportTryLockTimeout,
string unitName)
- : MQClientAPIImpl(mqClientId,
- clientRemotingProcessor,
- pullThreadNum,
- tcpConnectTimeout,
- tcpTransportTryLockTimeout,
- unitName) {
- m_processor = clientRemotingProcessor;
- }
- ClientRemotingProcessor* m_processor;
- void reInitRemoteClient(TcpRemotingClient* client) {
- m_pRemotingClient.reset(client);
- m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE, m_processor);
- m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET, m_processor);
- m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT, m_processor);
- m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO, m_processor);
- m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED, m_processor);
- m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY, m_processor);
- }
-};
-class MockMQClientAPIImplUtil {
- public:
- static MockMQClientAPIImplUtil* GetInstance() {
- static MockMQClientAPIImplUtil instance;
- return &instance;
- }
- MockMQClientAPIImpl* GetGtestMockClientAPIImpl() {
- if (m_impl != nullptr) {
- return m_impl;
- }
- string cid = "testClientId";
- int ptN = 1;
- uint64_t tct = 3000;
- uint64_t ttt = 3000;
- string un = "central";
- SessionCredentials sc;
- ClientRemotingProcessor* pp = new ClientRemotingProcessor(nullptr);
- MockMQClientAPIImpl* impl = new MockMQClientAPIImpl(cid, pp, ptN, tct, ttt, un);
- MockTcpRemotingClient* pClient = new MockTcpRemotingClient(ptN, tct, ttt);
- impl->reInitRemoteClient(pClient);
- m_impl = impl;
- m_pClient = pClient;
- return impl;
- }
- MockTcpRemotingClient* GetGtestMockRemotingClient() { return m_pClient; }
- MockMQClientAPIImpl* m_impl = nullptr;
- MockTcpRemotingClient* m_pClient = nullptr;
+ : MQClientAPIImpl(mqClientId) {}
+ void reInitRemoteClient(TcpRemotingClient* client) { m_pRemotingClient.reset(client); }
};
TEST(MQClientAPIImplTest, getMaxOffset) {
SessionCredentials sc;
- MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+ MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit");
Mock::AllowLeak(impl);
- MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+ MockTcpRemotingClient* pClient = new MockTcpRemotingClient();
Mock::AllowLeak(pClient);
+ impl->reInitRemoteClient(pClient);
GetMaxOffsetResponseHeader* pHead = new GetMaxOffsetResponseHeader();
pHead->offset = 4096;
RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
@@ -120,10 +76,11 @@
TEST(MQClientAPIImplTest, getMinOffset) {
SessionCredentials sc;
- MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+ MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit");
Mock::AllowLeak(impl);
- MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+ MockTcpRemotingClient* pClient = new MockTcpRemotingClient();
Mock::AllowLeak(pClient);
+ impl->reInitRemoteClient(pClient);
GetMinOffsetResponseHeader* pHead = new GetMinOffsetResponseHeader();
pHead->offset = 2048;
RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
@@ -154,11 +111,11 @@
TEST(MQClientAPIImplTest, sendMessage) {
string cid = "testClientId";
SessionCredentials sc;
- MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+ MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit");
Mock::AllowLeak(impl);
- MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+ MockTcpRemotingClient* pClient = new MockTcpRemotingClient();
Mock::AllowLeak(pClient);
-
+ impl->reInitRemoteClient(pClient);
SendMessageResponseHeader* pHead = new SendMessageResponseHeader();
pHead->msgId = "MessageID";
pHead->queueId = 1;
@@ -238,10 +195,11 @@
TEST(MQClientAPIImplTest, consumerSendMessageBack) {
SessionCredentials sc;
MQMessageExt msg;
- MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+ MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit");
Mock::AllowLeak(impl);
- MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+ MockTcpRemotingClient* pClient = new MockTcpRemotingClient();
Mock::AllowLeak(pClient);
+ impl->reInitRemoteClient(pClient);
RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr);
EXPECT_CALL(*pClient, invokeSync(_, _, _))
diff --git a/test/src/MQClientFactoryTest.cpp b/test/src/MQClientFactoryTest.cpp
index 11490e5..eb78cf7 100644
--- a/test/src/MQClientFactoryTest.cpp
+++ b/test/src/MQClientFactoryTest.cpp
@@ -20,17 +20,28 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
+#include "ConsumerRunningInfo.h"
+#include "DefaultMQPushConsumerImpl.h"
#include "MQClientFactory.h"
using namespace std;
using namespace rocketmq;
+using rocketmq::ConsumerRunningInfo;
+using rocketmq::DefaultMQPushConsumerImpl;
using rocketmq::MQClientFactory;
using rocketmq::TopicRouteData;
using testing::_;
using ::testing::InitGoogleMock;
using ::testing::InitGoogleTest;
+using testing::Mock;
using testing::Return;
+class MockPushConsumerImpl : public DefaultMQPushConsumerImpl {
+ public:
+ MockPushConsumerImpl(const std::string& groupname) : DefaultMQPushConsumerImpl() {}
+ MOCK_METHOD0(getConsumerRunningInfo, ConsumerRunningInfo*());
+};
+
class MockMQClientAPIImpl : public MQClientAPIImpl {
public:
MockMQClientAPIImpl(const string& mqClientId,
@@ -39,12 +50,7 @@
uint64_t tcpConnectTimeout,
uint64_t tcpTransportTryLockTimeout,
string unitName)
- : MQClientAPIImpl(mqClientId,
- clientRemotingProcessor,
- pullThreadNum,
- tcpConnectTimeout,
- tcpTransportTryLockTimeout,
- unitName) {}
+ : MQClientAPIImpl(mqClientId) {}
MOCK_METHOD5(getMinOffset, int64(const string&, const string&, int, int, const SessionCredentials&));
MOCK_METHOD3(getTopicRouteInfoFromNameServer, TopicRouteData*(const string&, int, const SessionCredentials&));
@@ -56,10 +62,11 @@
uint64_t tcpConnectTimeout,
uint64_t tcpTransportTryLockTimeout,
string unitName)
- : MQClientFactory(mqClientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName) {}
+ : MQClientFactory(mqClientId) {}
void reInitClientImpl(MQClientAPIImpl* pImpl) { m_pClientAPIImpl.reset(pImpl); }
- void reInitRemotingProcessor(ClientRemotingProcessor* pImpl) { m_pClientRemotingProcessor.reset(pImpl); }
- ClientRemotingProcessor* getRemotingProcessor() { return m_pClientRemotingProcessor.release(); }
+ void addTestConsumer(const string& consumerName, MQConsumer* pMQConsumer) {
+ addConsumerToTable(consumerName, pMQConsumer);
+ }
};
TEST(MQClientFactoryTest, minOffset) {
@@ -70,8 +77,8 @@
string unitName = "central";
MockMQClientFactory* factory =
new MockMQClientFactory(clientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName);
- MockMQClientAPIImpl* pImpl = new MockMQClientAPIImpl(clientId, factory->getRemotingProcessor(), pullThreadNum,
- tcpConnectTimeout, tcpTransportTryLockTimeout, unitName);
+ MockMQClientAPIImpl* pImpl = new MockMQClientAPIImpl(clientId, nullptr, pullThreadNum, tcpConnectTimeout,
+ tcpTransportTryLockTimeout, unitName);
factory->reInitClientImpl(pImpl);
MQMessageQueue mq;
mq.setTopic("testTopic");
@@ -100,6 +107,25 @@
delete factory;
}
+TEST(MQClientFactoryTest, consumerRunningInfo) {
+ string clientId = "testClientId";
+ int pullThreadNum = 1;
+ uint64_t tcpConnectTimeout = 3000;
+ uint64_t tcpTransportTryLockTimeout = 3000;
+ string unitName = "central";
+ MockMQClientFactory* factory =
+ new MockMQClientFactory(clientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName);
+ MockPushConsumerImpl* mockPushConsumer = new MockPushConsumerImpl(clientId);
+ Mock::AllowLeak(mockPushConsumer);
+ factory->addTestConsumer(clientId, mockPushConsumer);
+ ConsumerRunningInfo* info = new ConsumerRunningInfo();
+ info->setJstack("Hello,JStack");
+ EXPECT_CALL(*mockPushConsumer, getConsumerRunningInfo()).Times(1).WillOnce(Return(info));
+ ConsumerRunningInfo* info2 = factory->consumerRunningInfo(clientId);
+ EXPECT_EQ(info2->getJstack(), "Hello,JStack");
+ delete factory;
+}
+
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
return RUN_ALL_TESTS();
diff --git a/test/src/extern/CProducerTest.cpp b/test/src/extern/CProducerTest.cpp
index 14f9999..7798fe6 100644
--- a/test/src/extern/CProducerTest.cpp
+++ b/test/src/extern/CProducerTest.cpp
@@ -247,7 +247,21 @@
EXPECT_EQ(SetProducerLogLevel(NULL, E_LOG_LEVEL_FATAL), NULL_POINTER);
EXPECT_EQ(DestroyProducer(NULL), NULL_POINTER);
}
+TEST(cProducer, version) {
+ CProducer* cProducer = CreateProducer("groupTestVersion");
+ EXPECT_TRUE(cProducer != NULL);
+ string version(ShowProducerVersion(cProducer));
+ EXPECT_GT(version.length(), 0);
+ CProducer* cProducer2 = CreateOrderlyProducer("orderGroupTestVersion");
+ EXPECT_TRUE(cProducer2 != NULL);
+ string version2(ShowProducerVersion(cProducer2));
+ EXPECT_GT(version2.length(), 0);
+ CProducer* cProducer3 = CreateTransactionProducer("tranGroupTestVersion", NULL, NULL);
+ EXPECT_TRUE(cProducer3 != NULL);
+ string version3(ShowProducerVersion(cProducer3));
+ EXPECT_GT(version3.length(), 0);
+}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(throw_on_failure) = true;
diff --git a/test/src/extern/CPullConsumerTest.cpp b/test/src/extern/CPullConsumerTest.cpp
index bbdbbae..0ebfde6 100644
--- a/test/src/extern/CPullConsumerTest.cpp
+++ b/test/src/extern/CPullConsumerTest.cpp
@@ -196,7 +196,12 @@
EXPECT_EQ(StartPullConsumer(NULL), NULL_POINTER);
EXPECT_EQ(ShutdownPullConsumer(NULL), NULL_POINTER);
}
-
+TEST(cpullConsumer, version) {
+ CPullConsumer* pullConsumer = CreatePullConsumer("groupTestVersion");
+ EXPECT_TRUE(pullConsumer != NULL);
+ string version(ShowPullConsumerVersion(pullConsumer));
+ EXPECT_GT(version.length(), 0);
+}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(throw_on_failure) = true;
diff --git a/test/src/extern/CPushConsumerTest.cpp b/test/src/extern/CPushConsumerTest.cpp
index a2d2fc9..2eff2e7 100644
--- a/test/src/extern/CPushConsumerTest.cpp
+++ b/test/src/extern/CPushConsumerTest.cpp
@@ -147,7 +147,12 @@
EXPECT_EQ(SetPushConsumerLogLevel(NULL, E_LOG_LEVEL_LEVEL_NUM), NULL_POINTER);
EXPECT_EQ(SetPushConsumerMessageModel(NULL, BROADCASTING), NULL_POINTER);
}
-
+TEST(cPushComsumer, version) {
+ CPushConsumer* pushConsumer = CreatePushConsumer("groupTestVersion");
+ EXPECT_TRUE(pushConsumer != NULL);
+ string version(ShowPushConsumerVersion(pushConsumer));
+ EXPECT_GT(version.length(), 0);
+}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(filter) = "cPushComsumer.*";
diff --git a/test/src/protocol/ConsumerRunningInfoTest.cpp b/test/src/protocol/ConsumerRunningInfoTest.cpp
index 70b305f..8642e49 100644
--- a/test/src/protocol/ConsumerRunningInfoTest.cpp
+++ b/test/src/protocol/ConsumerRunningInfoTest.cpp
@@ -99,6 +99,7 @@
info.setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "consume_type");
info.setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION, "client_version");
info.setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, "127");
+ info.setProperty(ConsumerRunningInfo::PROP_CLIENT_SDK_VERSION, "sdk_version");
// encode
string outStr = info.encode();