feat(producer) polish async send message
diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp
index cf80639..32f1566 100644
--- a/src/PythonWrapper.cpp
+++ b/src/PythonWrapper.cpp
@@ -32,8 +32,6 @@
"PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " ";
map<CPushConsumer *, pair<PyObject *, object>> g_CallBackMap;
-map<void *, PyObject *> g_SendSuccessCallbackMap;
-map<void *, PyObject *> g_SendExceptionCallbackMap;
class PyThreadStateLock {
public:
@@ -164,25 +162,27 @@
return SendMessageOneway((CProducer *) producer, (CMessage *) msg);
}
-void PySendSuccessCallback(CSendResult result){
- map<void *, PyObject *>::iterator iter = g_SendSuccessCallbackMap.begin();
- while(iter != g_SendSuccessCallbackMap.end()) {
- boost::python::call<void>(iter->second, iter->first);
- }
+void PySendSuccessCallback(CSendResult result, CMessage* msg, void* pyCallback){
+ PySendResult ret;
+ PyCallback *callback = (PyCallback *)pyCallback;
+ PyMessage message = { .pMessage = msg };
+ ret.sendStatus = result.sendStatus;
+ ret.offset = result.offset;
+ strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
+ ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ boost::python::call<void>(callback->successCallback, sendResult, message);
}
-void PySendExceptionCallback(CMQException e){
- map<void *, PyObject *>::iterator iter = g_SendExceptionCallbackMap.begin();
- while(iter != g_SendExceptionCallbackMap.end()) {
- boost::python::call<void>(iter->second, iter->first);
- }
+
+void PySendExceptionCallback(CMQException e, CMessage* msg, void* pyCallback){
+ PyCallback *callback = (PyCallback *)pyCallback;
+ PyMessageExt message = { .pMessage = msg };
+ boost::python::call<void>(callback->execptionCallback, message, e);
}
-int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback , PyObject *sendExceptionCallback){
- g_SendSuccessCallbackMap[msg] = sendSuccessCallback;
- g_SendExceptionCallbackMap[msg] = sendExceptionCallback;
-
- return SendMessageAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback);
+int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){
+ PyCallback pyCallback = {sendSuccessCallback, sendExceptionCallback};
+ return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, &pyCallback);
}
@@ -327,6 +327,7 @@
.def_readonly("sendStatus", &PySendResult::sendStatus, "sendStatus")
.def("GetMsgId", &PySendResult::GetMsgId);
class_<PyMessageExt>("CMessageExt");
+ class_<PyMessage>("CMessage");
//For Message
def("CreateMessage", PyCreateMessage, return_value_policy<return_opaque_pointer>());
diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h
index 3dcbf15..6ade5cb 100644
--- a/src/PythonWrapper.h
+++ b/src/PythonWrapper.h
@@ -41,11 +41,20 @@
CMessageExt *pMessageExt;
} PyMessageExt;
+typedef struct _PyMessage_ {
+ CMessage *pMessage;
+} PyMessage;
+
typedef struct _PyUserData_ {
PyObject *pyObject;
void *pData;
} PyUserData;
+typedef struct _PyCallback_ {
+ PyObject *successCallback;
+ PyObject *execptionCallback;
+} PyCallback;
+
#define PYTHON_CLIENT_VERSION "1.2.0"
#define PYCLI_BUILD_DATE "04-12-2018"
@@ -89,7 +98,7 @@
void PySendSuccessCallback(CSendResult result);
void PySendExceptionCallback(CMQException ex);
-int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback , PyObject *sendExceptionCallback);
+int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback, void* userData);
PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector);