Realization C asynSend (#65)
Realization C asynSend
diff --git a/example/CAsyncProducer.c b/example/CAsyncProducer.c
new file mode 100644
index 0000000..56496b5
--- /dev/null
+++ b/example/CAsyncProducer.c
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <stdio.h>
+
+#include "CProducer.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+
+#ifdef _WIN32
+#include <windows.h>
+#else
+
+#include <unistd.h>
+#include <memory.h>
+
+#endif
+
+void thread_sleep(unsigned milliseconds) {
+#ifdef _WIN32
+ Sleep(milliseconds);
+#else
+ usleep(milliseconds * 1000); // takes microseconds
+#endif
+}
+
+void sendSuccessCallback(CSendResult result){
+ printf("Msg Send ID:%s\n", result.msgId);
+}
+
+void sendExceptionCallback(CMQException e){
+ printf("asyn send exception error : %d\n" , e.error);
+ printf("asyn send exception msg : %s\n" , e.msg);
+ printf("asyn send exception file : %s\n" , e.file);
+ printf("asyn send exception line : %d\n" , e.line);
+}
+
+void startSendMessage(CProducer *producer) {
+ int i = 0;
+ char DestMsg[256];
+ CMessage *msg = CreateMessage("T_TestTopic");
+ SetMessageTags(msg, "Test_Tag");
+ SetMessageKeys(msg, "Test_Keys");
+ CSendResult result;
+ for (i = 0; i < 10; i++) {
+ printf("send one message : %d\n", i);
+ memset(DestMsg, 0, sizeof(DestMsg));
+ snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i);
+ SetMessageBody(msg, DestMsg);
+ int code = SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback);
+ printf("Async send return code: %d\n", code);
+ thread_sleep(1000);
+ }
+}
+
+void CreateProducerAndStartSendMessage(int i){
+ printf("Producer Initializing.....\n");
+ CProducer *producer = CreateProducer("Group_producer");
+ SetProducerNameServerAddress(producer, "127.0.0.1:9876");
+ if(i == 1){
+ SetProducerSendMsgTimeout(producer , 3);
+ }
+ StartProducer(producer);
+ printf("Producer start.....\n");
+ startSendMessage(producer);
+ ShutdownProducer(producer);
+ DestroyProducer(producer);
+ printf("Producer Shutdown!\n");
+}
+
+int main(int argc, char *argv[]) {
+ printf("Send Async successCallback.....\n");
+ CreateProducerAndStartSendMessage(0);
+
+ printf("Send Async exceptionCallback.....\n");
+ CreateProducerAndStartSendMessage(1);
+
+ return 0;
+}
+
diff --git a/example/Producer.c b/example/Producer.c
index cef8383..5feedd6 100644
--- a/example/Producer.c
+++ b/example/Producer.c
@@ -62,11 +62,10 @@
printf("Producer Initializing.....\n");
CProducer *producer = CreateProducer("Group_producer");
- SetProducerNameServerAddress(producer, "172.17.0.2:9876");
+ SetProducerNameServerAddress(producer, "127.0.0.1:9876");
StartProducer(producer);
printf("Producer start.....\n");
startSendMessage(producer);
-
ShutdownProducer(producer);
DestroyProducer(producer);
printf("Producer Shutdown!\n");
diff --git a/example/PushConsumer.cpp b/example/PushConsumer.cpp
index d5ce021..119b7f2 100755
--- a/example/PushConsumer.cpp
+++ b/example/PushConsumer.cpp
@@ -81,7 +81,7 @@
if (info.syncpush) consumer.setAsyncPull(false); // set sync pull
if (info.broadcasting) {
- consumer.setMessageModel(BROADCASTING);
+ consumer.setMessageModel(rocketmq::BROADCASTING);
}
consumer.setInstanceName(info.groupname);
diff --git a/include/CCommon.h b/include/CCommon.h
index efcd2aa..eb9ffbc 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -36,6 +36,7 @@
PRODUCER_SEND_SYNC_FAILED = 11,
PRODUCER_SEND_ONEWAY_FAILED = 12,
PRODUCER_SEND_ORDERLY_FAILED = 13,
+ PRODUCER_SEND_ASYNC_FAILED = 14,
PUSHCONSUMER_ERROR_CODE_START = 20,
PUSHCONSUMER_START_FAILED = 20,
diff --git a/include/CMQException.h b/include/CMQException.h
new file mode 100644
index 0000000..da26edd
--- /dev/null
+++ b/include/CMQException.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __C_MQEXCPTION_H__
+#define __C_MQEXCPTION_H__
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define MAX_EXEPTION_CHAR_LENGTH 512
+
+typedef struct _CMQException_{
+ int error;
+ int line;
+ char file[MAX_EXEPTION_CHAR_LENGTH];
+ char msg[MAX_EXEPTION_CHAR_LENGTH];
+ char type[MAX_EXEPTION_CHAR_LENGTH];
+
+} CMQException;
+
+#ifdef __cplusplus
+};
+#endif
+#endif
diff --git a/include/CProducer.h b/include/CProducer.h
index 6edd99e..e75622c 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -18,9 +18,9 @@
#ifndef __C_PRODUCER_H__
#define __C_PRODUCER_H__
-#include "CCommon.h"
#include "CMessage.h"
#include "CSendResult.h"
+#include "CMQException.h"
#ifdef __cplusplus
extern "C" {
@@ -29,6 +29,8 @@
//typedef struct _CProducer_ _CProducer;
typedef struct CProducer CProducer;
typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg);
+typedef void(*CSendSuccessCallback)(CSendResult result);
+typedef void(*CSendExceptionCallback)(CMQException e);
ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId);
ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer);
@@ -49,6 +51,7 @@
ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer *producer, int size);
ROCKETMQCLIENT_API int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result);
+ROCKETMQCLIENT_API int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback , CSendExceptionCallback cSendExceptionCallback);
ROCKETMQCLIENT_API int SendMessageOneway(CProducer *producer,CMessage *msg);
ROCKETMQCLIENT_API int SendMessageOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult *result);
#ifdef __cplusplus
diff --git a/include/MQClientException.h b/include/MQClientException.h
index bf29863..2c3d2ed 100755
--- a/include/MQClientException.h
+++ b/include/MQClientException.h
@@ -21,12 +21,17 @@
#include <ostream>
#include <sstream>
#include <string>
+
+#include <string.h>
#include "RocketMQClient.h"
+#include "CCommon.h"
+
namespace rocketmq {
//<!***************************************************************************
class ROCKETMQCLIENT_API MQException : public std::exception {
+
public:
MQException(const std::string& msg, int error, const char* file,
int line) throw()
@@ -60,6 +65,10 @@
virtual const char* GetType() const throw() { return m_type.c_str(); }
+ int GetLine() { return m_line;}
+
+ const char* GetFile() { return m_file.c_str(); }
+
protected:
int m_error;
int m_line;
@@ -68,6 +77,7 @@
std::string m_type;
};
+
inline std::ostream& operator<<(std::ostream& os, const MQException& e) {
os << "Type: " << e.GetType() << " , " << e.what();
return os;
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 0715942..c238e59 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -16,10 +16,17 @@
*/
#include "DefaultMQProducer.h"
+#include "AsyncCallback.h"
+
#include "CProducer.h"
#include "CCommon.h"
-#include <string.h>
+#include "CSendResult.h"
#include "CMessage.h"
+#include "CMQException.h"
+
+#include <string.h>
+#include <typeinfo>
+
#ifdef __cplusplus
extern "C" {
@@ -45,6 +52,35 @@
QueueSelectorCallback m_pCallback;
};
+class CSendCallback : public AutoDeleteSendCallBack{
+public:
+ CSendCallback(CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
+ m_cSendSuccessCallback = cSendSuccessCallback;
+ m_cSendExceptionCallback = cSendExceptionCallback;
+ }
+ virtual ~CSendCallback(){}
+ virtual void onSuccess(SendResult& sendResult) {
+ CSendResult result;
+ result.sendStatus = CSendStatus((int) sendResult.getSendStatus());
+ result.offset = sendResult.getQueueOffset();
+ strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
+ result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ m_cSendSuccessCallback(result);
+
+ }
+ virtual void onException(MQException& e) {
+ CMQException exception;
+ exception.error = e.GetError();
+ exception.line = e.GetLine();
+ strncpy(exception.msg, e.what(), MAX_EXEPTION_CHAR_LENGTH - 1);
+ strncpy(exception.file, e.GetFile(), MAX_EXEPTION_CHAR_LENGTH - 1);
+ m_cSendExceptionCallback( exception );
+ }
+private:
+ CSendSuccessCallback m_cSendSuccessCallback;
+ CSendExceptionCallback m_cSendExceptionCallback;
+};
+
CProducer *CreateProducer(const char *groupId) {
if (groupId == NULL) {
@@ -127,6 +163,30 @@
return OK;
}
+int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
+ if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
+ MQMessage *message = (MQMessage *) msg;
+ CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback , cSendExceptionCallback);
+
+ try {
+ defaultMQProducer->send(*message ,cSendCallback);
+ } catch (exception &e) {
+ if(cSendCallback != NULL){
+ if(typeid(e) == typeid( MQException )){
+ MQException &mqe = (MQException &)e;
+ cSendCallback->onException( mqe );
+ }
+ delete cSendCallback;
+ cSendCallback = NULL;
+ }
+ return PRODUCER_SEND_ASYNC_FAILED;
+ }
+ return OK;
+}
+
int SendMessageOneway(CProducer *producer, CMessage *msg) {
if (producer == NULL || msg == NULL) {
return NULL_POINTER;
diff --git a/test/src/UrlTest.cpp b/test/src/UrlTest.cpp
index c7dead0..e310cf4 100644
--- a/test/src/UrlTest.cpp
+++ b/test/src/UrlTest.cpp
@@ -20,6 +20,15 @@
#include "gtest/gtest.h"
#include "gmock/gmock.h"
+#include <stdio.h>
+
+#include "CProducer.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+#include "CMQException.h"
+#include <unistd.h>
+
using namespace std;
using ::testing::InitGoogleTest;
using ::testing::InitGoogleMock;
@@ -53,9 +62,13 @@
}
+
+
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(filter) = "Url.Url";
- return RUN_ALL_TESTS();
+ int itestts = RUN_ALL_TESTS();
+ printf("i %d" , itestts);
+ return itestts;
}