feat(callback): use start pointer to manager callbacks (#232)
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index f2829a9..8d72d64 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -414,11 +414,12 @@
int retrySendTimes) {
int64 begin_time = UtilAll::currentTimeMillis();
//<!delete in future;
- AsyncCallbackWrap* cbw = new SendCallbackWrap(brokerName, msg, pSendCallback, this);
+ // AsyncCallbackWrap* cbw = new SendCallbackWrap(brokerName, msg, pSendCallback, this);
LOG_DEBUG("sendMessageAsync request:%s, timeout:%lld, maxRetryTimes:%d retrySendTimes:%d", request.ToString().data(),
timeoutMilliseconds, maxRetryTimes, retrySendTimes);
-
+ // Use smart ptr to control cbw.
+ std::shared_ptr<AsyncCallbackWrap> cbw = std::make_shared<SendCallbackWrap>(brokerName, msg, pSendCallback, this);
if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds, maxRetryTimes, retrySendTimes) == false) {
LOG_WARN("invokeAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", addr.c_str(),
msg.getTopic().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes);
@@ -441,9 +442,9 @@
LOG_ERROR("sendMessageAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d",
addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retrySendTimes);
- if (cbw) {
+ if (cbw && pSendCallback != nullptr) {
cbw->onException();
- deleteAndZero(cbw);
+ // deleteAndZero(cbw);
} else {
THROW_MQEXCEPTION(MQClientException, "sendMessageAsync failed", -1);
}
@@ -481,12 +482,12 @@
int timeoutMillis,
PullCallback* pullCallback,
void* pArg) {
- //<!delete in future;
- AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
+ // AsyncCallbackWrap* cbw = new PullCallbackWrap(pullCallback, this, pArg);
+ std::shared_ptr<AsyncCallbackWrap> cbw = std::make_shared<PullCallbackWrap>(pullCallback, this, pArg);
if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) {
LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(),
static_cast<AsyncArg*>(pArg)->mq.toString().data());
- deleteAndZero(cbw);
+ // deleteAndZero(cbw);
THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
}
}
diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp
index cffa219..4fe29a9 100644
--- a/src/common/AsyncCallbackWrap.cpp
+++ b/src/common/AsyncCallbackWrap.cpp
@@ -129,14 +129,14 @@
}
//<!************************************************************************
-PullCallbackWarp::PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg)
+PullCallbackWrap::PullCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg)
: AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
m_pArg = *static_cast<AsyncArg*>(pArg);
}
-PullCallbackWarp::~PullCallbackWarp() {}
+PullCallbackWrap::~PullCallbackWrap() {}
-void PullCallbackWarp::onException() {
+void PullCallbackWrap::onException() {
if (m_pAsyncCallBack == NULL)
return;
@@ -149,7 +149,7 @@
}
}
-void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
+void PullCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
if (m_pAsyncCallBack == NULL) {
LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
diff --git a/src/common/AsyncCallbackWrap.h b/src/common/AsyncCallbackWrap.h
index c4b3f66..8802fdb 100644
--- a/src/common/AsyncCallbackWrap.h
+++ b/src/common/AsyncCallbackWrap.h
@@ -29,7 +29,7 @@
class MQClientAPIImpl;
class DefaultMQProducer;
//<!***************************************************************************
-enum asyncCallBackType { asyncCallbackWrap = 0, sendCallbackWrap = 1, pullCallbackWarp = 2 };
+enum asyncCallBackType { asyncCallbackWrap = 0, sendCallbackWrap = 1, pullCallbackWrap = 2 };
struct AsyncCallbackWrap {
public:
@@ -50,7 +50,7 @@
SendCallbackWrap(const string& brokerName,
const MQMessage& msg,
AsyncCallback* pAsyncCallback,
- MQClientAPIImpl* pclientAPI);
+ MQClientAPIImpl* pClientAPI);
virtual ~SendCallbackWrap(){};
virtual void operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest);
@@ -63,13 +63,13 @@
};
//<!***************************************************************************
-class PullCallbackWarp : public AsyncCallbackWrap {
+class PullCallbackWrap : public AsyncCallbackWrap {
public:
- PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg);
- virtual ~PullCallbackWarp();
+ PullCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pClientAPI, void* pArg);
+ virtual ~PullCallbackWrap();
virtual void operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest);
virtual void onException();
- virtual asyncCallBackType getCallbackType() { return pullCallbackWarp; }
+ virtual asyncCallBackType getCallbackType() { return pullCallbackWrap; }
private:
AsyncArg m_pArg;
diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
old mode 100755
new mode 100644
index b0b2613..8ac926b
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -29,7 +29,7 @@
TcpRemotingClient* powner,
int64 timeout,
bool bAsync,
- AsyncCallbackWrap* pCallback)
+ std::shared_ptr<AsyncCallbackWrap> pCallback)
: m_requestCode(requestCode),
m_opaque(opaque),
m_timeout(timeout),
@@ -45,13 +45,7 @@
m_beginTimestamp = UtilAll::currentTimeMillis();
}
-ResponseFuture::~ResponseFuture() {
- deleteAndZero(m_pCallbackWrap);
- /*
- do not delete m_pResponseCommand when destruct, as m_pResponseCommand
- is used by MQClientAPIImpl concurrently, and will be released by producer or consumer;
- */
-}
+ResponseFuture::~ResponseFuture() {}
void ResponseFuture::releaseThreadCondition() {
m_defaultEvent.notify_all();
@@ -174,7 +168,7 @@
return m_pResponseCommand;
}
-AsyncCallbackWrap* ResponseFuture::getAsyncCallbackWrap() {
+std::shared_ptr<AsyncCallbackWrap> ResponseFuture::getAsyncCallbackWrap() {
return m_pCallbackWrap;
}
diff --git a/src/transport/ResponseFuture.h b/src/transport/ResponseFuture.h
old mode 100755
new mode 100644
index 66be663..a2a4a12
--- a/src/transport/ResponseFuture.h
+++ b/src/transport/ResponseFuture.h
@@ -41,7 +41,7 @@
TcpRemotingClient* powner,
int64 timeoutMilliseconds,
bool bAsync = false,
- AsyncCallbackWrap* pCallback = nullptr);
+ std::shared_ptr<AsyncCallbackWrap> pCallback = std::shared_ptr<AsyncCallbackWrap>());
virtual ~ResponseFuture();
void releaseThreadCondition();
@@ -63,7 +63,7 @@
int getRetrySendTimes() const;
int64 leftTime() const;
const bool getAsyncFlag();
- AsyncCallbackWrap* getAsyncCallbackWrap();
+ std::shared_ptr<AsyncCallbackWrap> getAsyncCallbackWrap();
void setMaxRetrySendTimes(int maxRetryTimes);
void setRetrySendTimes(int retryTimes);
@@ -78,7 +78,7 @@
int64 m_timeout; // ms
const bool m_bAsync;
- AsyncCallbackWrap* m_pCallbackWrap;
+ std::shared_ptr<AsyncCallbackWrap> m_pCallbackWrap;
AsyncCallbackStatus m_asyncCallbackStatus;
std::mutex m_asyncCallbackLock;
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
old mode 100755
new mode 100644
index 0207bbe..4c708f7
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -159,8 +159,8 @@
if (pTcp != nullptr) {
int code = request.getCode();
int opaque = request.getOpaque();
-
- std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis));
+ std::shared_ptr<AsyncCallbackWrap> cbw;
+ std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis, false, cbw));
addResponseFuture(opaque, responseFuture);
if (SendCommand(pTcp, request)) {
@@ -193,8 +193,8 @@
if (pTcp != nullptr) {
int code = request.getCode();
int opaque = request.getOpaque();
-
- std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis));
+ std::shared_ptr<AsyncCallbackWrap> cbw;
+ std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis, false, cbw));
addResponseFuture(opaque, responseFuture);
if (SendCommand(pTcp, request)) {
@@ -224,7 +224,7 @@
bool TcpRemotingClient::invokeAsync(const string& addr,
RemotingCommand& request,
- AsyncCallbackWrap* callback,
+ std::shared_ptr<AsyncCallbackWrap> callback,
int64 timeoutMillis,
int maxRetrySendTimes,
int retrySendTimes) {
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index 265177f..66760b6 100644
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -49,7 +49,7 @@
virtual bool invokeAsync(const string& addr,
RemotingCommand& request,
- AsyncCallbackWrap* cbw,
+ std::shared_ptr<AsyncCallbackWrap> cbw,
int64 timeoutMilliseconds,
int maxRetrySendTimes = 1,
int retrySendTimes = 1);
diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp
index 3b2ab07..2c0687a 100644
--- a/test/src/MQClientAPIImpTest.cpp
+++ b/test/src/MQClientAPIImpTest.cpp
@@ -39,6 +39,7 @@
: TcpRemotingClient(pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout) {}
MOCK_METHOD3(invokeSync, RemotingCommand*(const string&, RemotingCommand&, int));
+ MOCK_METHOD6(invokeAsync, bool(const string&, RemotingCommand&, std::shared_ptr<AsyncCallbackWrap>, int64, int, int));
};
class MockMQClientAPIImpl : public MQClientAPIImpl {
public:
@@ -137,6 +138,18 @@
int64 offset = impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc);
EXPECT_EQ(2048, offset);
}
+class MyMockAutoDeleteSendCallback : public AutoDeleteSendCallBack {
+ public:
+ virtual ~MyMockAutoDeleteSendCallback() {}
+ virtual void onSuccess(SendResult& sendResult) {
+ std::cout << "send Success" << std::endl;
+ return;
+ }
+ virtual void onException(MQException& e) {
+ std::cout << "send Exception" << e << std::endl;
+ return;
+ }
+};
TEST(MQClientAPIImplTest, sendMessage) {
string cid = "testClientId";
@@ -169,6 +182,57 @@
EXPECT_EQ(result.getOffsetMsgId(), "MessageID");
EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker");
EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
+
+ // Try to test Async send
+
+ EXPECT_CALL(*pClient, invokeAsync(_, _, _, _, _, _))
+ .Times(7)
+ .WillOnce(Return(false))
+ .WillOnce(Return(true))
+ .WillOnce(Return(false))
+ .WillOnce(Return(true))
+ .WillOnce(Return(false))
+ .WillOnce(Return(false))
+ .WillOnce(Return(false));
+
+ SendMessageRequestHeader* requestHeader2 = new SendMessageRequestHeader();
+ requestHeader2->producerGroup = cid;
+ requestHeader2->topic = (message.getTopic());
+ requestHeader2->defaultTopic = DEFAULT_TOPIC;
+ requestHeader2->defaultTopicQueueNums = 4;
+ requestHeader2->bornTimestamp = UtilAll::currentTimeMillis();
+ EXPECT_ANY_THROW(
+ impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader2, 100, 1, ComMode_ASYNC, nullptr, sc));
+
+ SendMessageRequestHeader* requestHeader3 = new SendMessageRequestHeader();
+ requestHeader3->producerGroup = cid;
+ requestHeader3->topic = (message.getTopic());
+ requestHeader3->defaultTopic = DEFAULT_TOPIC;
+ requestHeader3->defaultTopicQueueNums = 4;
+ requestHeader3->bornTimestamp = UtilAll::currentTimeMillis();
+ SendCallback* pSendCallback = new MyMockAutoDeleteSendCallback();
+ EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader3, 100, 1, ComMode_ASYNC,
+ pSendCallback, sc));
+
+ SendMessageRequestHeader* requestHeader4 = new SendMessageRequestHeader();
+ requestHeader4->producerGroup = cid;
+ requestHeader4->topic = (message.getTopic());
+ requestHeader4->defaultTopic = DEFAULT_TOPIC;
+ requestHeader4->defaultTopicQueueNums = 4;
+ requestHeader4->bornTimestamp = UtilAll::currentTimeMillis();
+ SendCallback* pSendCallback2 = new MyMockAutoDeleteSendCallback();
+ EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader4, 1000, 2, ComMode_ASYNC,
+ pSendCallback2, sc));
+
+ SendMessageRequestHeader* requestHeader5 = new SendMessageRequestHeader();
+ requestHeader5->producerGroup = cid;
+ requestHeader5->topic = (message.getTopic());
+ requestHeader5->defaultTopic = DEFAULT_TOPIC;
+ requestHeader5->defaultTopicQueueNums = 4;
+ requestHeader5->bornTimestamp = UtilAll::currentTimeMillis();
+ SendCallback* pSendCallback3 = new MyMockAutoDeleteSendCallback();
+ EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader5, 1000, 3, ComMode_ASYNC,
+ pSendCallback3, sc));
}
TEST(MQClientAPIImplTest, consumerSendMessageBack) {
diff --git a/test/src/transport/ResponseFutureTest.cpp b/test/src/transport/ResponseFutureTest.cpp
index 9c23ed8..058dbe7 100644
--- a/test/src/transport/ResponseFutureTest.cpp
+++ b/test/src/transport/ResponseFutureTest.cpp
@@ -68,8 +68,8 @@
EXPECT_TRUE(responseFuture.getAsyncCallbackWrap() == nullptr);
// ~ResponseFuture delete pcall
- SendCallbackWrap* pcall = new SendCallbackWrap("", MQMessage(), nullptr, nullptr);
- ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, true, pcall);
+ std::shared_ptr<AsyncCallbackWrap> callBack = std::make_shared<SendCallbackWrap>("", MQMessage(), nullptr, nullptr);
+ ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, true, callBack);
EXPECT_TRUE(twoResponseFuture.getAsyncFlag());
EXPECT_FALSE(twoResponseFuture.getAsyncCallbackWrap() == nullptr);
}
@@ -104,10 +104,10 @@
ResponseFuture twoResponseFuture(13, 4, NULL, 1000, true);
EXPECT_TRUE(twoResponseFuture.getAsyncFlag());
- ResponseFuture threeSesponseFuture(13, 4, NULL, 1000);
+ ResponseFuture threeResponseFuture(13, 4, NULL, 1000);
uint64_t millis = UtilAll::currentTimeMillis();
- RemotingCommand* remotingCommand = threeSesponseFuture.waitResponse(10);
+ RemotingCommand* remotingCommand = threeResponseFuture.waitResponse(10);
uint64_t useTime = UtilAll::currentTimeMillis() - millis;
EXPECT_LT(useTime, 30);