refactor(transaction): use userdata to cache the local checker callback (#252)
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index d025a21..a0c3698 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -39,7 +39,15 @@
#endif
using namespace rocketmq;
using namespace std;
-
+class MyLocalTransactionExecuterInner {
+ public:
+ MyLocalTransactionExecuterInner(CLocalTransactionExecutorCallback executor, CMessage* msg, void* userData)
+ : m_ExcutorCallback(executor), message(msg), data(userData) {}
+ ~MyLocalTransactionExecuterInner() {}
+ CLocalTransactionExecutorCallback m_ExcutorCallback;
+ CMessage* message;
+ void* data;
+};
class LocalTransactionListenerInner : public TransactionListener {
public:
LocalTransactionListenerInner() {}
@@ -56,8 +64,10 @@
if (m_CheckerCallback == NULL) {
return LocalTransactionState::UNKNOWN;
}
- CMessage* msg = (CMessage*)(&message);
- CTransactionStatus status = m_ExcutorCallback(m_producer, msg, arg);
+ (void)(message);
+ MyLocalTransactionExecuterInner* executerInner = (MyLocalTransactionExecuterInner*)arg;
+ CTransactionStatus status =
+ executerInner->m_ExcutorCallback(m_producer, executerInner->message, executerInner->data);
switch (status) {
case E_COMMIT_TRANSACTION:
return LocalTransactionState::COMMIT_MESSAGE;
@@ -91,15 +101,8 @@
private:
CLocalTransactionCheckerCallback m_CheckerCallback;
- CLocalTransactionExecutorCallback m_ExcutorCallback;
-
CProducer* m_producer;
void* m_data;
-
- public:
- void setM_m_ExcutorCallback(CLocalTransactionExecutorCallback excutorcallback) {
- m_ExcutorCallback = excutorcallback;
- }
};
class SelectMessageQueueInner : public MessageQueueSelector {
@@ -584,8 +587,10 @@
try {
DefaultProducer* defaultMQProducer = (DefaultProducer*)producer;
MQMessage* message = (MQMessage*)msg;
- defaultMQProducer->listenerInner->setM_m_ExcutorCallback(callback);
- SendResult sendResult = defaultMQProducer->innerTransactionProducer->sendMessageInTransaction(*message, userData);
+ MyLocalTransactionExecuterInner executerInner(callback, msg, userData);
+ // defaultMQProducer->listenerInner->setM_m_ExcutorCallback(callback);
+ SendResult sendResult =
+ defaultMQProducer->innerTransactionProducer->sendMessageInTransaction(*message, &executerInner);
result->sendStatus = CSendStatus((int)sendResult.getSendStatus());
result->offset = sendResult.getQueueOffset();
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);