feat(callback): use start pointer to manager callbacks (#232)

diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index f2829a9..8d72d64 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -414,11 +414,12 @@
                                        int retrySendTimes) {
   int64 begin_time = UtilAll::currentTimeMillis();
   //<!delete in future;
-  AsyncCallbackWrap* cbw = new SendCallbackWrap(brokerName, msg, pSendCallback, this);
+  // AsyncCallbackWrap* cbw = new SendCallbackWrap(brokerName, msg, pSendCallback, this);
 
   LOG_DEBUG("sendMessageAsync request:%s, timeout:%lld, maxRetryTimes:%d retrySendTimes:%d", request.ToString().data(),
             timeoutMilliseconds, maxRetryTimes, retrySendTimes);
-
+  // Use smart ptr to control cbw.
+  std::shared_ptr<AsyncCallbackWrap> cbw = std::make_shared<SendCallbackWrap>(brokerName, msg, pSendCallback, this);
   if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds, maxRetryTimes, retrySendTimes) == false) {
     LOG_WARN("invokeAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", addr.c_str(),
              msg.getTopic().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes);
@@ -441,9 +442,9 @@
     LOG_ERROR("sendMessageAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d",
               addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retrySendTimes);
 
-    if (cbw) {
+    if (cbw && pSendCallback != nullptr) {
       cbw->onException();
-      deleteAndZero(cbw);
+      // deleteAndZero(cbw);
     } else {
       THROW_MQEXCEPTION(MQClientException, "sendMessageAsync failed", -1);
     }
@@ -481,12 +482,12 @@
                                        int timeoutMillis,
                                        PullCallback* pullCallback,
                                        void* pArg) {
-  //<!delete in future;
-  AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
+  // AsyncCallbackWrap* cbw = new PullCallbackWrap(pullCallback, this, pArg);
+  std::shared_ptr<AsyncCallbackWrap> cbw = std::make_shared<PullCallbackWrap>(pullCallback, this, pArg);
   if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) {
     LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(),
               static_cast<AsyncArg*>(pArg)->mq.toString().data());
-    deleteAndZero(cbw);
+    // deleteAndZero(cbw);
     THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
   }
 }
diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp
index cffa219..4fe29a9 100644
--- a/src/common/AsyncCallbackWrap.cpp
+++ b/src/common/AsyncCallbackWrap.cpp
@@ -129,14 +129,14 @@
 }
 
 //<!************************************************************************
-PullCallbackWarp::PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg)
+PullCallbackWrap::PullCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg)
     : AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
   m_pArg = *static_cast<AsyncArg*>(pArg);
 }
 
-PullCallbackWarp::~PullCallbackWarp() {}
+PullCallbackWrap::~PullCallbackWrap() {}
 
-void PullCallbackWarp::onException() {
+void PullCallbackWrap::onException() {
   if (m_pAsyncCallBack == NULL)
     return;
 
@@ -149,7 +149,7 @@
   }
 }
 
-void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
+void PullCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
   unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
   if (m_pAsyncCallBack == NULL) {
     LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
diff --git a/src/common/AsyncCallbackWrap.h b/src/common/AsyncCallbackWrap.h
index c4b3f66..8802fdb 100644
--- a/src/common/AsyncCallbackWrap.h
+++ b/src/common/AsyncCallbackWrap.h
@@ -29,7 +29,7 @@
 class MQClientAPIImpl;

 class DefaultMQProducer;

 //<!***************************************************************************

-enum asyncCallBackType { asyncCallbackWrap = 0, sendCallbackWrap = 1, pullCallbackWarp = 2 };

+enum asyncCallBackType { asyncCallbackWrap = 0, sendCallbackWrap = 1, pullCallbackWrap = 2 };

 

 struct AsyncCallbackWrap {

  public:

@@ -50,7 +50,7 @@
   SendCallbackWrap(const string& brokerName,

                    const MQMessage& msg,

                    AsyncCallback* pAsyncCallback,

-                   MQClientAPIImpl* pclientAPI);

+                   MQClientAPIImpl* pClientAPI);

 

   virtual ~SendCallbackWrap(){};

   virtual void operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest);

@@ -63,13 +63,13 @@
 };

 

 //<!***************************************************************************

-class PullCallbackWarp : public AsyncCallbackWrap {

+class PullCallbackWrap : public AsyncCallbackWrap {

  public:

-  PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg);

-  virtual ~PullCallbackWarp();

+  PullCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pClientAPI, void* pArg);

+  virtual ~PullCallbackWrap();

   virtual void operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest);

   virtual void onException();

-  virtual asyncCallBackType getCallbackType() { return pullCallbackWarp; }

+  virtual asyncCallBackType getCallbackType() { return pullCallbackWrap; }

 

  private:

   AsyncArg m_pArg;

diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
old mode 100755
new mode 100644
index b0b2613..8ac926b
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -29,7 +29,7 @@
                                TcpRemotingClient* powner,
                                int64 timeout,
                                bool bAsync,
-                               AsyncCallbackWrap* pCallback)
+                               std::shared_ptr<AsyncCallbackWrap> pCallback)
     : m_requestCode(requestCode),
       m_opaque(opaque),
       m_timeout(timeout),
@@ -45,13 +45,7 @@
   m_beginTimestamp = UtilAll::currentTimeMillis();
 }
 
-ResponseFuture::~ResponseFuture() {
-  deleteAndZero(m_pCallbackWrap);
-  /*
-    do not delete m_pResponseCommand when destruct, as m_pResponseCommand
-    is used by MQClientAPIImpl concurrently, and will be released by producer or consumer;
-   */
-}
+ResponseFuture::~ResponseFuture() {}
 
 void ResponseFuture::releaseThreadCondition() {
   m_defaultEvent.notify_all();
@@ -174,7 +168,7 @@
   return m_pResponseCommand;
 }
 
-AsyncCallbackWrap* ResponseFuture::getAsyncCallbackWrap() {
+std::shared_ptr<AsyncCallbackWrap> ResponseFuture::getAsyncCallbackWrap() {
   return m_pCallbackWrap;
 }
 
diff --git a/src/transport/ResponseFuture.h b/src/transport/ResponseFuture.h
old mode 100755
new mode 100644
index 66be663..a2a4a12
--- a/src/transport/ResponseFuture.h
+++ b/src/transport/ResponseFuture.h
@@ -41,7 +41,7 @@
                  TcpRemotingClient* powner,
                  int64 timeoutMilliseconds,
                  bool bAsync = false,
-                 AsyncCallbackWrap* pCallback = nullptr);
+                 std::shared_ptr<AsyncCallbackWrap> pCallback = std::shared_ptr<AsyncCallbackWrap>());
   virtual ~ResponseFuture();
 
   void releaseThreadCondition();
@@ -63,7 +63,7 @@
   int getRetrySendTimes() const;
   int64 leftTime() const;
   const bool getAsyncFlag();
-  AsyncCallbackWrap* getAsyncCallbackWrap();
+  std::shared_ptr<AsyncCallbackWrap> getAsyncCallbackWrap();
 
   void setMaxRetrySendTimes(int maxRetryTimes);
   void setRetrySendTimes(int retryTimes);
@@ -78,7 +78,7 @@
   int64 m_timeout;  // ms
 
   const bool m_bAsync;
-  AsyncCallbackWrap* m_pCallbackWrap;
+  std::shared_ptr<AsyncCallbackWrap> m_pCallbackWrap;
 
   AsyncCallbackStatus m_asyncCallbackStatus;
   std::mutex m_asyncCallbackLock;
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
old mode 100755
new mode 100644
index 0207bbe..4c708f7
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -159,8 +159,8 @@
   if (pTcp != nullptr) {
     int code = request.getCode();
     int opaque = request.getOpaque();
-
-    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis));
+    std::shared_ptr<AsyncCallbackWrap> cbw;
+    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis, false, cbw));
     addResponseFuture(opaque, responseFuture);
 
     if (SendCommand(pTcp, request)) {
@@ -193,8 +193,8 @@
   if (pTcp != nullptr) {
     int code = request.getCode();
     int opaque = request.getOpaque();
-
-    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis));
+    std::shared_ptr<AsyncCallbackWrap> cbw;
+    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis, false, cbw));
     addResponseFuture(opaque, responseFuture);
 
     if (SendCommand(pTcp, request)) {
@@ -224,7 +224,7 @@
 
 bool TcpRemotingClient::invokeAsync(const string& addr,
                                     RemotingCommand& request,
-                                    AsyncCallbackWrap* callback,
+                                    std::shared_ptr<AsyncCallbackWrap> callback,
                                     int64 timeoutMillis,
                                     int maxRetrySendTimes,
                                     int retrySendTimes) {
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index 265177f..66760b6 100644
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -49,7 +49,7 @@
 
   virtual bool invokeAsync(const string& addr,
                            RemotingCommand& request,
-                           AsyncCallbackWrap* cbw,
+                           std::shared_ptr<AsyncCallbackWrap> cbw,
                            int64 timeoutMilliseconds,
                            int maxRetrySendTimes = 1,
                            int retrySendTimes = 1);
diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp
index 3b2ab07..2c0687a 100644
--- a/test/src/MQClientAPIImpTest.cpp
+++ b/test/src/MQClientAPIImpTest.cpp
@@ -39,6 +39,7 @@
       : TcpRemotingClient(pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout) {}
 
   MOCK_METHOD3(invokeSync, RemotingCommand*(const string&, RemotingCommand&, int));
+  MOCK_METHOD6(invokeAsync, bool(const string&, RemotingCommand&, std::shared_ptr<AsyncCallbackWrap>, int64, int, int));
 };
 class MockMQClientAPIImpl : public MQClientAPIImpl {
  public:
@@ -137,6 +138,18 @@
   int64 offset = impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc);
   EXPECT_EQ(2048, offset);
 }
+class MyMockAutoDeleteSendCallback : public AutoDeleteSendCallBack {
+ public:
+  virtual ~MyMockAutoDeleteSendCallback() {}
+  virtual void onSuccess(SendResult& sendResult) {
+    std::cout << "send Success" << std::endl;
+    return;
+  }
+  virtual void onException(MQException& e) {
+    std::cout << "send Exception" << e << std::endl;
+    return;
+  }
+};
 
 TEST(MQClientAPIImplTest, sendMessage) {
   string cid = "testClientId";
@@ -169,6 +182,57 @@
   EXPECT_EQ(result.getOffsetMsgId(), "MessageID");
   EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker");
   EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
+
+  // Try to test Async send
+
+  EXPECT_CALL(*pClient, invokeAsync(_, _, _, _, _, _))
+      .Times(7)
+      .WillOnce(Return(false))
+      .WillOnce(Return(true))
+      .WillOnce(Return(false))
+      .WillOnce(Return(true))
+      .WillOnce(Return(false))
+      .WillOnce(Return(false))
+      .WillOnce(Return(false));
+
+  SendMessageRequestHeader* requestHeader2 = new SendMessageRequestHeader();
+  requestHeader2->producerGroup = cid;
+  requestHeader2->topic = (message.getTopic());
+  requestHeader2->defaultTopic = DEFAULT_TOPIC;
+  requestHeader2->defaultTopicQueueNums = 4;
+  requestHeader2->bornTimestamp = UtilAll::currentTimeMillis();
+  EXPECT_ANY_THROW(
+      impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader2, 100, 1, ComMode_ASYNC, nullptr, sc));
+
+  SendMessageRequestHeader* requestHeader3 = new SendMessageRequestHeader();
+  requestHeader3->producerGroup = cid;
+  requestHeader3->topic = (message.getTopic());
+  requestHeader3->defaultTopic = DEFAULT_TOPIC;
+  requestHeader3->defaultTopicQueueNums = 4;
+  requestHeader3->bornTimestamp = UtilAll::currentTimeMillis();
+  SendCallback* pSendCallback = new MyMockAutoDeleteSendCallback();
+  EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader3, 100, 1, ComMode_ASYNC,
+                                    pSendCallback, sc));
+
+  SendMessageRequestHeader* requestHeader4 = new SendMessageRequestHeader();
+  requestHeader4->producerGroup = cid;
+  requestHeader4->topic = (message.getTopic());
+  requestHeader4->defaultTopic = DEFAULT_TOPIC;
+  requestHeader4->defaultTopicQueueNums = 4;
+  requestHeader4->bornTimestamp = UtilAll::currentTimeMillis();
+  SendCallback* pSendCallback2 = new MyMockAutoDeleteSendCallback();
+  EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader4, 1000, 2, ComMode_ASYNC,
+                                    pSendCallback2, sc));
+
+  SendMessageRequestHeader* requestHeader5 = new SendMessageRequestHeader();
+  requestHeader5->producerGroup = cid;
+  requestHeader5->topic = (message.getTopic());
+  requestHeader5->defaultTopic = DEFAULT_TOPIC;
+  requestHeader5->defaultTopicQueueNums = 4;
+  requestHeader5->bornTimestamp = UtilAll::currentTimeMillis();
+  SendCallback* pSendCallback3 = new MyMockAutoDeleteSendCallback();
+  EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader5, 1000, 3, ComMode_ASYNC,
+                                    pSendCallback3, sc));
 }
 
 TEST(MQClientAPIImplTest, consumerSendMessageBack) {
diff --git a/test/src/transport/ResponseFutureTest.cpp b/test/src/transport/ResponseFutureTest.cpp
index 9c23ed8..058dbe7 100644
--- a/test/src/transport/ResponseFutureTest.cpp
+++ b/test/src/transport/ResponseFutureTest.cpp
@@ -68,8 +68,8 @@
   EXPECT_TRUE(responseFuture.getAsyncCallbackWrap() == nullptr);
 
   // ~ResponseFuture  delete pcall
-  SendCallbackWrap* pcall = new SendCallbackWrap("", MQMessage(), nullptr, nullptr);
-  ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, true, pcall);
+  std::shared_ptr<AsyncCallbackWrap> callBack = std::make_shared<SendCallbackWrap>("", MQMessage(), nullptr, nullptr);
+  ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, true, callBack);
   EXPECT_TRUE(twoResponseFuture.getAsyncFlag());
   EXPECT_FALSE(twoResponseFuture.getAsyncCallbackWrap() == nullptr);
 }
@@ -104,10 +104,10 @@
   ResponseFuture twoResponseFuture(13, 4, NULL, 1000, true);
   EXPECT_TRUE(twoResponseFuture.getAsyncFlag());
 
-  ResponseFuture threeSesponseFuture(13, 4, NULL, 1000);
+  ResponseFuture threeResponseFuture(13, 4, NULL, 1000);
 
   uint64_t millis = UtilAll::currentTimeMillis();
-  RemotingCommand* remotingCommand = threeSesponseFuture.waitResponse(10);
+  RemotingCommand* remotingCommand = threeResponseFuture.waitResponse(10);
   uint64_t useTime = UtilAll::currentTimeMillis() - millis;
   EXPECT_LT(useTime, 30);