feat(trace): add message trace shift for C style apis. (#281)
diff --git a/include/CCommon.h b/include/CCommon.h
index fa6edb9..ae8a9e1 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -83,6 +83,7 @@
#endif
typedef enum _CMessageModel_ { BROADCASTING, CLUSTERING } CMessageModel;
+typedef enum _CTraceModel_ { OPEN, CLOSE } CTraceModel;
#ifdef __cplusplus
}
diff --git a/include/CProducer.h b/include/CProducer.h
index fa6a730..296b13f 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -63,6 +63,7 @@
ROCKETMQCLIENT_API int SetProducerSendMsgTimeout(CProducer* producer, int timeout);
ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer* producer, int level);
ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size);
+ROCKETMQCLIENT_API int SetProducerMessageTrace(CProducer* consumer, CTraceModel openTrace);
ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result);
ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result);
diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h
index e2bce18..5ed83f3 100644
--- a/include/CPushConsumer.h
+++ b/include/CPushConsumer.h
@@ -59,6 +59,7 @@
ROCKETMQCLIENT_API int SetPushConsumerMessageModel(CPushConsumer* consumer, CMessageModel messageModel);
ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSize(CPushConsumer* consumer, int maxCacheSize);
ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSizeInMb(CPushConsumer* consumer, int maxCacheSizeInMb);
+ROCKETMQCLIENT_API int SetPushConsumerMessageTrace(CPushConsumer* consumer, CTraceModel openTrace);
#ifdef __cplusplus
}
diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h
index f4c0281..784de6c 100644
--- a/include/TransactionMQProducer.h
+++ b/include/TransactionMQProducer.h
@@ -79,7 +79,8 @@
void setLogLevel(elogLevel inputLevel);
elogLevel getLogLevel();
void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit
-
+ void setMessageTrace(bool messageTrace);
+ bool getMessageTrace() const;
std::shared_ptr<TransactionListener> getTransactionListener();
void setTransactionListener(TransactionListener* listener);
TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg);
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 00a9b05..0b139e2 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -797,6 +797,24 @@
}
return OK;
}
+int SetProducerMessageTrace(CProducer* producer, CTraceModel openTrace) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultProducer* defaultMQProducer = (DefaultProducer*)producer;
+ bool messageTrace = openTrace == OPEN ? true : false;
+ try {
+ if (CAPI_C_PRODUCER_TYPE_TRANSACTION == defaultMQProducer->producerType) {
+ defaultMQProducer->innerTransactionProducer->setMessageTrace(messageTrace);
+ } else {
+ defaultMQProducer->innerProducer->setMessageTrace(messageTrace);
+ }
+ } catch (exception& e) {
+ MQClientErrorContainer::setErr(string(e.what()));
+ return PRODUCER_START_FAILED;
+ }
+ return OK;
+}
#ifdef __cplusplus
};
#endif
diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp
index 5ee89ca..77da3af 100644
--- a/src/extern/CPushConsumer.cpp
+++ b/src/extern/CPushConsumer.cpp
@@ -296,7 +296,14 @@
((DefaultMQPushConsumer*)consumer)->setLogLevel((elogLevel)level);
return OK;
}
-
+int SetPushConsumerMessageTrace(CPushConsumer* consumer, CTraceModel openTrace) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ bool messageTrace = openTrace == OPEN ? true : false;
+ ((DefaultMQPushConsumer*)consumer)->setMessageTrace(messageTrace);
+ return OK;
+}
#ifdef __cplusplus
};
#endif
diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
index c246a80..97ed1b3 100644
--- a/src/producer/TransactionMQProducer.cpp
+++ b/src/producer/TransactionMQProducer.cpp
@@ -160,7 +160,12 @@
const std::string& TransactionMQProducer::getUnitName() const {
return impl->getUnitName();
}
-
+void TransactionMQProducer::setMessageTrace(bool messageTrace) {
+ impl->setMessageTrace(messageTrace);
+}
+bool TransactionMQProducer::getMessageTrace() const {
+ return impl->getMessageTrace();
+}
std::shared_ptr<TransactionListener> TransactionMQProducer::getTransactionListener() {
return impl->getTransactionListener();
}
diff --git a/test/src/extern/CProducerTest.cpp b/test/src/extern/CProducerTest.cpp
index 7798fe6..9a5156f 100644
--- a/test/src/extern/CProducerTest.cpp
+++ b/test/src/extern/CProducerTest.cpp
@@ -228,6 +228,10 @@
EXPECT_EQ(SetProducerSessionCredentials(cProducer, "accessKey", "secretKey", "channel"), OK);
SessionCredentials sessionCredentials = defaultMQProducer->getSessionCredentials();
EXPECT_EQ(sessionCredentials.getAccessKey(), "accessKey");
+
+ EXPECT_EQ(SetProducerMessageTrace(cProducer, OPEN), OK);
+ EXPECT_EQ(defaultMQProducer->getMessageTrace(), true);
+
Mock::AllowLeak(defaultMQProducer);
}
diff --git a/test/src/extern/CPushConsumerTest.cpp b/test/src/extern/CPushConsumerTest.cpp
index 2eff2e7..6b516ab 100644
--- a/test/src/extern/CPushConsumerTest.cpp
+++ b/test/src/extern/CPushConsumerTest.cpp
@@ -118,6 +118,8 @@
EXPECT_EQ(SetPushConsumerMessageModel(cpushConsumer, BROADCASTING), OK);
EXPECT_EQ(mqPushConsumer->getMessageModel(), MessageModel::BROADCASTING);
+ EXPECT_EQ(SetPushConsumerMessageTrace(cpushConsumer, CLOSE), OK);
+ EXPECT_EQ(mqPushConsumer->getMessageTrace(), false);
Mock::AllowLeak(mqPushConsumer);
}