Realization C asynSend  (#65)

Realization C asynSend
diff --git a/example/CAsyncProducer.c b/example/CAsyncProducer.c
new file mode 100644
index 0000000..56496b5
--- /dev/null
+++ b/example/CAsyncProducer.c
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <stdio.h>
+
+#include "CProducer.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+
+#ifdef _WIN32
+#include <windows.h>
+#else
+
+#include <unistd.h>
+#include <memory.h>
+
+#endif
+
+void thread_sleep(unsigned milliseconds) {
+#ifdef _WIN32
+    Sleep(milliseconds);
+#else
+    usleep(milliseconds * 1000);  // takes microseconds
+#endif
+}
+
+void sendSuccessCallback(CSendResult result){
+	printf("Msg Send ID:%s\n", result.msgId);
+}
+
+void sendExceptionCallback(CMQException e){
+	printf("asyn send exception error : %d\n" , e.error);
+	printf("asyn send exception msg : %s\n" , e.msg);
+	printf("asyn send exception file : %s\n" , e.file);
+	printf("asyn send exception line : %d\n" , e.line);
+}
+
+void startSendMessage(CProducer *producer) {
+    int i = 0;
+    char DestMsg[256];
+    CMessage *msg = CreateMessage("T_TestTopic");
+    SetMessageTags(msg, "Test_Tag");
+    SetMessageKeys(msg, "Test_Keys");
+    CSendResult result;
+    for (i = 0; i < 10; i++) {
+        printf("send one message : %d\n", i);
+        memset(DestMsg, 0, sizeof(DestMsg));
+        snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i);
+        SetMessageBody(msg, DestMsg);
+        int code = SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback);
+        printf("Async send return code: %d\n", code);
+        thread_sleep(1000);
+    }
+}
+
+void CreateProducerAndStartSendMessage(int i){
+	printf("Producer Initializing.....\n");
+	CProducer *producer = CreateProducer("Group_producer");
+	SetProducerNameServerAddress(producer, "127.0.0.1:9876");
+	if(i == 1){
+		SetProducerSendMsgTimeout(producer , 3);
+	}
+	StartProducer(producer);
+	printf("Producer start.....\n");
+	startSendMessage(producer);
+	ShutdownProducer(producer);
+	DestroyProducer(producer);
+	printf("Producer Shutdown!\n");
+}
+
+int main(int argc, char *argv[]) {
+    printf("Send Async successCallback.....\n");
+    CreateProducerAndStartSendMessage(0);
+
+    printf("Send Async exceptionCallback.....\n");
+    CreateProducerAndStartSendMessage(1);
+
+    return 0;
+}
+
diff --git a/example/Producer.c b/example/Producer.c
index cef8383..5feedd6 100644
--- a/example/Producer.c
+++ b/example/Producer.c
@@ -62,11 +62,10 @@
     printf("Producer Initializing.....\n");
 
     CProducer *producer = CreateProducer("Group_producer");
-    SetProducerNameServerAddress(producer, "172.17.0.2:9876");
+    SetProducerNameServerAddress(producer, "127.0.0.1:9876");
     StartProducer(producer);
     printf("Producer start.....\n");
     startSendMessage(producer);
-
     ShutdownProducer(producer);
     DestroyProducer(producer);
     printf("Producer Shutdown!\n");
diff --git a/example/PushConsumer.cpp b/example/PushConsumer.cpp
index d5ce021..119b7f2 100755
--- a/example/PushConsumer.cpp
+++ b/example/PushConsumer.cpp
@@ -81,7 +81,7 @@
 
   if (info.syncpush) consumer.setAsyncPull(false);  // set sync pull
   if (info.broadcasting) {
-    consumer.setMessageModel(BROADCASTING);
+    consumer.setMessageModel(rocketmq::BROADCASTING);
   }
 
   consumer.setInstanceName(info.groupname);
diff --git a/include/CCommon.h b/include/CCommon.h
index efcd2aa..eb9ffbc 100644
--- a/include/CCommon.h
+++ b/include/CCommon.h
@@ -36,6 +36,7 @@
     PRODUCER_SEND_SYNC_FAILED = 11,
     PRODUCER_SEND_ONEWAY_FAILED = 12,
     PRODUCER_SEND_ORDERLY_FAILED = 13,
+	PRODUCER_SEND_ASYNC_FAILED = 14,
 
     PUSHCONSUMER_ERROR_CODE_START = 20,
     PUSHCONSUMER_START_FAILED = 20,
diff --git a/include/CMQException.h b/include/CMQException.h
new file mode 100644
index 0000000..da26edd
--- /dev/null
+++ b/include/CMQException.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef __C_MQEXCPTION_H__
+#define __C_MQEXCPTION_H__
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define  MAX_EXEPTION_CHAR_LENGTH 512
+
+typedef struct _CMQException_{
+	int error;
+	int line;
+	char file[MAX_EXEPTION_CHAR_LENGTH];
+	char msg[MAX_EXEPTION_CHAR_LENGTH];
+	char type[MAX_EXEPTION_CHAR_LENGTH];
+
+} CMQException;
+
+#ifdef __cplusplus
+};
+#endif
+#endif
diff --git a/include/CProducer.h b/include/CProducer.h
index 6edd99e..e75622c 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -18,9 +18,9 @@
 #ifndef __C_PRODUCER_H__
 #define __C_PRODUCER_H__
 
-#include "CCommon.h"
 #include "CMessage.h"
 #include "CSendResult.h"
+#include "CMQException.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -29,6 +29,8 @@
 //typedef struct _CProducer_ _CProducer;
 typedef struct CProducer CProducer;
 typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg);
+typedef void(*CSendSuccessCallback)(CSendResult result);
+typedef void(*CSendExceptionCallback)(CMQException e);
 
 ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId);
 ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer);
@@ -49,6 +51,7 @@
 ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer *producer, int size);
 
 ROCKETMQCLIENT_API int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result);
+ROCKETMQCLIENT_API int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback , CSendExceptionCallback cSendExceptionCallback);
 ROCKETMQCLIENT_API int SendMessageOneway(CProducer *producer,CMessage *msg);
 ROCKETMQCLIENT_API int SendMessageOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult *result);
 #ifdef __cplusplus
diff --git a/include/MQClientException.h b/include/MQClientException.h
index bf29863..2c3d2ed 100755
--- a/include/MQClientException.h
+++ b/include/MQClientException.h
@@ -21,12 +21,17 @@
 #include <ostream>
 #include <sstream>
 #include <string>
+
+#include <string.h>
 #include "RocketMQClient.h"
+#include "CCommon.h"
+
 
 
 namespace rocketmq {
 //<!***************************************************************************
 class ROCKETMQCLIENT_API MQException : public std::exception {
+
  public:
   MQException(const std::string& msg, int error, const char* file,
               int line) throw()
@@ -60,6 +65,10 @@
 
   virtual const char* GetType() const throw() { return m_type.c_str(); }
 
+  int GetLine() { return m_line;}
+
+  const char* GetFile()  { return m_file.c_str(); }
+
  protected:
   int m_error;
   int m_line;
@@ -68,6 +77,7 @@
   std::string m_type;
 };
 
+
 inline std::ostream& operator<<(std::ostream& os, const MQException& e) {
   os << "Type: " << e.GetType() << " , " << e.what();
   return os;
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 0715942..c238e59 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -16,10 +16,17 @@
  */
 
 #include "DefaultMQProducer.h"
+#include "AsyncCallback.h"
+
 #include "CProducer.h"
 #include "CCommon.h"
-#include <string.h>
+#include "CSendResult.h"
 #include "CMessage.h"
+#include "CMQException.h"
+
+#include <string.h>
+#include <typeinfo>
+
 
 #ifdef __cplusplus
 extern "C" {
@@ -45,6 +52,35 @@
     QueueSelectorCallback m_pCallback;
 };
 
+class CSendCallback : public AutoDeleteSendCallBack{
+public:
+	CSendCallback(CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
+		m_cSendSuccessCallback = cSendSuccessCallback;
+		m_cSendExceptionCallback = cSendExceptionCallback;
+	}
+	virtual ~CSendCallback(){}
+	virtual void onSuccess(SendResult& sendResult) {
+		CSendResult result;
+		result.sendStatus = CSendStatus((int) sendResult.getSendStatus());
+		result.offset = sendResult.getQueueOffset();
+		strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
+		result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+		m_cSendSuccessCallback(result);
+
+	}
+    virtual void onException(MQException& e) {
+        CMQException exception;
+        exception.error = e.GetError();
+        exception.line  = e.GetLine();
+        strncpy(exception.msg, e.what(), MAX_EXEPTION_CHAR_LENGTH - 1);
+        strncpy(exception.file, e.GetFile(), MAX_EXEPTION_CHAR_LENGTH - 1);
+    	m_cSendExceptionCallback( exception );
+    }
+private:
+	CSendSuccessCallback m_cSendSuccessCallback;
+	CSendExceptionCallback m_cSendExceptionCallback;
+};
+
 
 CProducer *CreateProducer(const char *groupId) {
     if (groupId == NULL) {
@@ -127,6 +163,30 @@
     return OK;
 }
 
+int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
+	if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) {
+		return NULL_POINTER;
+	}
+	DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
+	MQMessage *message = (MQMessage *) msg;
+	CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback , cSendExceptionCallback);
+
+	try {
+		defaultMQProducer->send(*message ,cSendCallback);
+	} catch (exception &e) {
+		if(cSendCallback != NULL){
+			if(typeid(e) == typeid( MQException )){
+				MQException &mqe = (MQException &)e;
+				cSendCallback->onException( mqe );
+			}
+			delete cSendCallback;
+			cSendCallback = NULL;
+		}
+		return PRODUCER_SEND_ASYNC_FAILED;
+	}
+	return OK;
+}
+
 int SendMessageOneway(CProducer *producer, CMessage *msg) {
     if (producer == NULL || msg == NULL) {
         return NULL_POINTER;
diff --git a/test/src/UrlTest.cpp b/test/src/UrlTest.cpp
index c7dead0..e310cf4 100644
--- a/test/src/UrlTest.cpp
+++ b/test/src/UrlTest.cpp
@@ -20,6 +20,15 @@
 #include "gtest/gtest.h"
 #include "gmock/gmock.h"
 
+#include <stdio.h>
+
+#include "CProducer.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+#include "CMQException.h"
+#include <unistd.h>
+
 using namespace std;
 using ::testing::InitGoogleTest;
 using ::testing::InitGoogleMock;
@@ -53,9 +62,13 @@
 
 }
 
+
+
 int main(int argc, char* argv[]) {
 	InitGoogleMock(&argc, argv);
 
 	testing::GTEST_FLAG(filter) = "Url.Url";
-	return RUN_ALL_TESTS();
+	int itestts = RUN_ALL_TESTS();
+	printf("i %d" , itestts);
+	return itestts;
 }