[ISSUE#67] Resolve CreateMessage but without DestroyMessage for example AsyncProducer and SyncProducer (#68)
Resolve CreateMessage but without DestroyMessage, for example, AsyncProducer and SyncProducer
diff --git a/example/CAsyncProducer.c b/example/CAsyncProducer.c
index 56496b5..db4a957 100644
--- a/example/CAsyncProducer.c
+++ b/example/CAsyncProducer.c
@@ -16,19 +16,15 @@
*/
#include <stdio.h>
-
#include "CProducer.h"
#include "CCommon.h"
#include "CMessage.h"
#include "CSendResult.h"
-
#ifdef _WIN32
#include <windows.h>
#else
-
#include <unistd.h>
#include <memory.h>
-
#endif
void thread_sleep(unsigned milliseconds) {
@@ -39,48 +35,47 @@
#endif
}
-void sendSuccessCallback(CSendResult result){
- printf("Msg Send ID:%s\n", result.msgId);
+void SendSuccessCallback(CSendResult result){
+ printf("async send success, msgid:%s\n", result.msgId);
}
-void sendExceptionCallback(CMQException e){
- printf("asyn send exception error : %d\n" , e.error);
- printf("asyn send exception msg : %s\n" , e.msg);
- printf("asyn send exception file : %s\n" , e.file);
- printf("asyn send exception line : %d\n" , e.line);
+void SendExceptionCallback(CMQException e){
+ char msg[1024];
+ snprintf(msg, sizeof(msg), "error:%d, msg:%s, file:%s:%d", e.error, e.msg, e.file, e.line);
+ printf("async send exception %s\n", msg);
}
-void startSendMessage(CProducer *producer) {
+void StartSendMessage(CProducer *producer) {
int i = 0;
- char DestMsg[256];
+ int ret_code = 0;
+ char body[128];
CMessage *msg = CreateMessage("T_TestTopic");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
- CSendResult result;
for (i = 0; i < 10; i++) {
- printf("send one message : %d\n", i);
- memset(DestMsg, 0, sizeof(DestMsg));
- snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i);
- SetMessageBody(msg, DestMsg);
- int code = SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback);
- printf("Async send return code: %d\n", code);
+ memset(body, 0, sizeof(body));
+ snprintf(body, sizeof(body), "new message body, index %d", i);
+ SetMessageBody(msg, body);
+ ret_code = SendMessageAsync(producer, msg, SendSuccessCallback , SendExceptionCallback);
+ printf("async send message[%d] return code: %d\n", i, ret_code);
thread_sleep(1000);
}
+ DestroyMessage(msg);
}
void CreateProducerAndStartSendMessage(int i){
- printf("Producer Initializing.....\n");
- CProducer *producer = CreateProducer("Group_producer");
- SetProducerNameServerAddress(producer, "127.0.0.1:9876");
- if(i == 1){
- SetProducerSendMsgTimeout(producer , 3);
- }
- StartProducer(producer);
- printf("Producer start.....\n");
- startSendMessage(producer);
- ShutdownProducer(producer);
- DestroyProducer(producer);
- printf("Producer Shutdown!\n");
+ printf("Producer Initializing.....\n");
+ CProducer *producer = CreateProducer("Group_producer");
+ SetProducerNameServerAddress(producer, "127.0.0.1:9876");
+ if(i == 1){
+ SetProducerSendMsgTimeout(producer , 3);
+ }
+ StartProducer(producer);
+ printf("Producer start.....\n");
+ StartSendMessage(producer);
+ ShutdownProducer(producer);
+ DestroyProducer(producer);
+ printf("Producer Shutdown!\n");
}
int main(int argc, char *argv[]) {
@@ -89,7 +84,6 @@
printf("Send Async exceptionCallback.....\n");
CreateProducerAndStartSendMessage(1);
-
return 0;
}
diff --git a/example/Producer.c b/example/Producer.c
index 5feedd6..97fe6ed 100644
--- a/example/Producer.c
+++ b/example/Producer.c
@@ -16,19 +16,15 @@
*/
#include <stdio.h>
-
#include "CProducer.h"
#include "CCommon.h"
#include "CMessage.h"
#include "CSendResult.h"
-
#ifdef _WIN32
#include <windows.h>
#else
-
#include <unistd.h>
#include <memory.h>
-
#endif
void thread_sleep(unsigned milliseconds) {
@@ -39,33 +35,30 @@
#endif
}
-void startSendMessage(CProducer *producer) {
+void StartSendMessage(CProducer *producer) {
int i = 0;
- char DestMsg[256];
+ char body[256];
CMessage *msg = CreateMessage("T_TestTopic");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
CSendResult result;
for (i = 0; i < 10; i++) {
- printf("send one message : %d\n", i);
- memset(DestMsg, 0, sizeof(DestMsg));
- snprintf(DestMsg, 255, "New message body: index %d", i);
- SetMessageBody(msg, DestMsg);
+ memset(body, 0, sizeof(body));
+ SetMessageBody(msg, body);
SendMessageSync(producer, msg, &result);
- printf("Msg Send ID:%s\n", result.msgId);
+ printf("send message[%d] result status:%d, msgId:%s\n", i, (int)result.sendStatus, result.msgId);
thread_sleep(1000);
}
+ DestroyMessage(msg);
}
-
int main(int argc, char *argv[]) {
printf("Producer Initializing.....\n");
-
CProducer *producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
StartProducer(producer);
printf("Producer start.....\n");
- startSendMessage(producer);
+ StartSendMessage(producer);
ShutdownProducer(producer);
DestroyProducer(producer);
printf("Producer Shutdown!\n");
diff --git a/example/PullConsumeMessage.c b/example/PullConsumeMessage.c
index 8be2207..657370f 100644
--- a/example/PullConsumeMessage.c
+++ b/example/PullConsumeMessage.c
@@ -16,20 +16,16 @@
*/
#include <stdio.h>
-
#include "CPullConsumer.h"
#include "CCommon.h"
#include "CMessageExt.h"
#include "CPullResult.h"
#include "CMessageQueue.h"
-
#ifdef _WIN32
#include <windows.h>
#else
-
#include <unistd.h>
#include <memory.h>
-
#endif
void thread_sleep(unsigned milliseconds) {
diff --git a/example/PushConsumeMessage.c b/example/PushConsumeMessage.c
index 0e2906d..81bcbc1 100644
--- a/example/PushConsumeMessage.c
+++ b/example/PushConsumeMessage.c
@@ -16,18 +16,14 @@
*/
#include <stdio.h>
-
#include "CPushConsumer.h"
#include "CCommon.h"
#include "CMessageExt.h"
-
#ifdef _WIN32
#include <windows.h>
#else
-
#include <unistd.h>
#include <memory.h>
-
#endif
void thread_sleep(unsigned milliseconds) {
@@ -47,7 +43,6 @@
return E_CONSUME_SUCCESS;
}
-
int main(int argc, char *argv[]) {
int i = 0;
printf("PushConsumer Initializing....\n");
diff --git a/example/common.h b/example/common.h
index 4ce0181..21d2ba0 100755
--- a/example/common.h
+++ b/example/common.h
@@ -65,20 +65,20 @@
public:
TpsReportService() : tps_interval_(1), quit_flag_(false), tps_count_(0) {}
void start() {
- if (tps_thread_ == NULL) {
- std::cout << "tps_thread_ is null" << std::endl;
- return;
- }
+ if (tps_thread_ == NULL) {
+ std::cout << "tps_thread_ is null" << std::endl;
+ return;
+ }
tps_thread_.reset(
new boost::thread(boost::bind(&TpsReportService::TpsReport, this)));
}
~TpsReportService() {
quit_flag_.store(true);
- if (tps_thread_ == NULL) {
- std::cout << "tps_thread_ is null" << std::endl;
- return;
- }
+ if (tps_thread_ == NULL) {
+ std::cout << "tps_thread_ is null" << std::endl;
+ return;
+ }
if (tps_thread_->joinable()) tps_thread_->join();
}
@@ -99,12 +99,14 @@
boost::atomic<long> tps_count_;
};
+/*
static void PrintResult(rocketmq::SendResult* result) {
std::cout << "sendresult = " << result->getSendStatus()
<< ", msgid = " << result->getMsgId()
<< ", queueOffset = " << result->getQueueOffset() << ","
<< result->getMessageQueue().toString() << endl;
}
+*/
void PrintPullResult(rocketmq::PullResult* result) {
std::cout << result->toString() << std::endl;
diff --git a/include/CCommon.h b/include/CCommon.h
index eb9ffbc..85c5aaf 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -36,7 +36,7 @@
PRODUCER_SEND_SYNC_FAILED = 11,
PRODUCER_SEND_ONEWAY_FAILED = 12,
PRODUCER_SEND_ORDERLY_FAILED = 13,
- PRODUCER_SEND_ASYNC_FAILED = 14,
+ PRODUCER_SEND_ASYNC_FAILED = 14,
PUSHCONSUMER_ERROR_CODE_START = 20,
PUSHCONSUMER_START_FAILED = 20,
@@ -57,7 +57,6 @@
E_LOG_LEVEL_LEVEL_NUM = 7
} CLogLevel;
-
#ifdef WIN32
#ifdef ROCKETMQCLIENT_EXPORTS
#ifdef _WINDLL
diff --git a/include/CMQException.h b/include/CMQException.h
index da26edd..69706e8 100644
--- a/include/CMQException.h
+++ b/include/CMQException.h
@@ -14,8 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-
#ifndef __C_MQEXCPTION_H__
#define __C_MQEXCPTION_H__
#include "CCommon.h"
@@ -24,14 +22,15 @@
extern "C" {
#endif
-#define MAX_EXEPTION_CHAR_LENGTH 512
-
+#define MAX_EXEPTION_MSG_LENGTH 512
+#define MAX_EXEPTION_FILE_LENGTH 256
+#define MAX_EXEPTION_TYPE_LENGTH 128
typedef struct _CMQException_{
- int error;
- int line;
- char file[MAX_EXEPTION_CHAR_LENGTH];
- char msg[MAX_EXEPTION_CHAR_LENGTH];
- char type[MAX_EXEPTION_CHAR_LENGTH];
+ int error;
+ int line;
+ char file[MAX_EXEPTION_FILE_LENGTH];
+ char msg[MAX_EXEPTION_MSG_LENGTH];
+ char type[MAX_EXEPTION_TYPE_LENGTH];
} CMQException;
diff --git a/include/MQClientException.h b/include/MQClientException.h
index 2c3d2ed..513621d 100755
--- a/include/MQClientException.h
+++ b/include/MQClientException.h
@@ -21,13 +21,10 @@
#include <ostream>
#include <sstream>
#include <string>
-
#include <string.h>
#include "RocketMQClient.h"
#include "CCommon.h"
-
-
namespace rocketmq {
//<!***************************************************************************
class ROCKETMQCLIENT_API MQException : public std::exception {
@@ -56,17 +53,12 @@
} catch (...) {
}
}
-
+
virtual ~MQException() throw() {}
-
const char* what() const throw() { return m_msg.c_str(); }
-
int GetError() const throw() { return m_error; }
-
virtual const char* GetType() const throw() { return m_type.c_str(); }
-
int GetLine() { return m_line;}
-
const char* GetFile() { return m_file.c_str(); }
protected:
@@ -77,7 +69,6 @@
std::string m_type;
};
-
inline std::ostream& operator<<(std::ostream& os, const MQException& e) {
os << "Type: " << e.GetType() << " , " << e.what();
return os;
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index c238e59..edcc42a 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -17,17 +17,14 @@
#include "DefaultMQProducer.h"
#include "AsyncCallback.h"
-
#include "CProducer.h"
#include "CCommon.h"
#include "CSendResult.h"
#include "CMessage.h"
#include "CMQException.h"
-
#include <string.h>
#include <typeinfo>
-
#ifdef __cplusplus
extern "C" {
#endif
@@ -54,34 +51,32 @@
class CSendCallback : public AutoDeleteSendCallBack{
public:
- CSendCallback(CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
- m_cSendSuccessCallback = cSendSuccessCallback;
- m_cSendExceptionCallback = cSendExceptionCallback;
- }
- virtual ~CSendCallback(){}
- virtual void onSuccess(SendResult& sendResult) {
- CSendResult result;
- result.sendStatus = CSendStatus((int) sendResult.getSendStatus());
- result.offset = sendResult.getQueueOffset();
- strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
- result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
- m_cSendSuccessCallback(result);
-
- }
+ CSendCallback(CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
+ m_cSendSuccessCallback = cSendSuccessCallback;
+ m_cSendExceptionCallback = cSendExceptionCallback;
+ }
+ virtual ~CSendCallback(){}
+ virtual void onSuccess(SendResult& sendResult) {
+ CSendResult result;
+ result.sendStatus = CSendStatus((int) sendResult.getSendStatus());
+ result.offset = sendResult.getQueueOffset();
+ strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
+ result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ m_cSendSuccessCallback(result);
+ }
virtual void onException(MQException& e) {
CMQException exception;
exception.error = e.GetError();
exception.line = e.GetLine();
- strncpy(exception.msg, e.what(), MAX_EXEPTION_CHAR_LENGTH - 1);
- strncpy(exception.file, e.GetFile(), MAX_EXEPTION_CHAR_LENGTH - 1);
- m_cSendExceptionCallback( exception );
+ strncpy(exception.msg, e.what(), MAX_EXEPTION_MSG_LENGTH - 1);
+ strncpy(exception.file, e.GetFile(), MAX_EXEPTION_FILE_LENGTH - 1);
+ m_cSendExceptionCallback( exception );
}
private:
- CSendSuccessCallback m_cSendSuccessCallback;
- CSendExceptionCallback m_cSendExceptionCallback;
+ CSendSuccessCallback m_cSendSuccessCallback;
+ CSendExceptionCallback m_cSendExceptionCallback;
};
-
CProducer *CreateProducer(const char *groupId) {
if (groupId == NULL) {
return NULL;
@@ -163,28 +158,28 @@
return OK;
}
-int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
- if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) {
- return NULL_POINTER;
- }
- DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
- MQMessage *message = (MQMessage *) msg;
- CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback , cSendExceptionCallback);
+int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback, CSendExceptionCallback cSendExceptionCallback){
+ if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
+ MQMessage *message = (MQMessage *) msg;
+ CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback , cSendExceptionCallback);
- try {
- defaultMQProducer->send(*message ,cSendCallback);
- } catch (exception &e) {
- if(cSendCallback != NULL){
- if(typeid(e) == typeid( MQException )){
- MQException &mqe = (MQException &)e;
- cSendCallback->onException( mqe );
- }
- delete cSendCallback;
- cSendCallback = NULL;
- }
- return PRODUCER_SEND_ASYNC_FAILED;
- }
- return OK;
+ try {
+ defaultMQProducer->send(*message ,cSendCallback);
+ } catch (exception &e) {
+ if(cSendCallback != NULL){
+ if(typeid(e) == typeid( MQException )){
+ MQException &mqe = (MQException &)e;
+ cSendCallback->onException( mqe );
+ }
+ delete cSendCallback;
+ cSendCallback = NULL;
+ }
+ return PRODUCER_SEND_ASYNC_FAILED;
+ }
+ return OK;
}
int SendMessageOneway(CProducer *producer, CMessage *msg) {
diff --git a/test/src/UrlTest.cpp b/test/src/UrlTest.cpp
index e310cf4..571c8e5 100644
--- a/test/src/UrlTest.cpp
+++ b/test/src/UrlTest.cpp
@@ -19,15 +19,13 @@
#include "TopicConfig.h"
#include "gtest/gtest.h"
#include "gmock/gmock.h"
-
-#include <stdio.h>
-
#include "CProducer.h"
#include "CCommon.h"
#include "CMessage.h"
#include "CSendResult.h"
#include "CMQException.h"
#include <unistd.h>
+#include <stdio.h>
using namespace std;
using ::testing::InitGoogleTest;
@@ -38,37 +36,33 @@
class MockTopicConfig : public TopicConfig{
public:
- MOCK_METHOD0(getReadQueueNums , int());
+ MOCK_METHOD0(getReadQueueNums , int());
};
-
TEST(Url, Url) {
- Url url_s("172.17.0.2:9876");
- EXPECT_EQ(url_s.protocol_ , "172.17.0.2:9876");
+ Url url_s("172.17.0.2:9876");
+ EXPECT_EQ(url_s.protocol_ , "172.17.0.2:9876");
- Url url_z("https://www.aliyun.com/RocketMQ?5.0");
- EXPECT_EQ(url_z.protocol_ , "https");
- EXPECT_EQ(url_z.host_ , "www.aliyun.com");
- EXPECT_EQ(url_z.port_ , "80");
- EXPECT_EQ(url_z.path_ , "/RocketMQ");
- EXPECT_EQ(url_z.query_ , "5.0");
+ Url url_z("https://www.aliyun.com/RocketMQ?5.0");
+ EXPECT_EQ(url_z.protocol_ , "https");
+ EXPECT_EQ(url_z.host_ , "www.aliyun.com");
+ EXPECT_EQ(url_z.port_ , "80");
+ EXPECT_EQ(url_z.path_ , "/RocketMQ");
+ EXPECT_EQ(url_z.query_ , "5.0");
- Url url_path("https://www.aliyun.com:9876/RocketMQ?5.0");
- EXPECT_EQ(url_path.port_ , "9876");
- MockTopicConfig topicConfig;
- EXPECT_CALL(topicConfig , getReadQueueNums()).WillRepeatedly(Return(-1));
- int nums = topicConfig.getReadQueueNums();
- cout << nums << endl;
-
+ Url url_path("https://www.aliyun.com:9876/RocketMQ?5.0");
+ EXPECT_EQ(url_path.port_ , "9876");
+ MockTopicConfig topicConfig;
+ EXPECT_CALL(topicConfig , getReadQueueNums()).WillRepeatedly(Return(-1));
+ int nums = topicConfig.getReadQueueNums();
+ cout << nums << endl;
}
-
-
int main(int argc, char* argv[]) {
- InitGoogleMock(&argc, argv);
+ InitGoogleMock(&argc, argv);
- testing::GTEST_FLAG(filter) = "Url.Url";
- int itestts = RUN_ALL_TESTS();
- printf("i %d" , itestts);
- return itestts;
+ testing::GTEST_FLAG(filter) = "Url.Url";
+ int itestts = RUN_ALL_TESTS();
+ printf("RUN_ALL_TESTS return %d" , itestts);
+ return itestts;
}