| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "CProducer.h" |
| #include <string.h> |
| #include <typeindex> |
| #include <string.h> |
| #include <typeinfo> |
| #include "AsyncCallback.h" |
| #include "CBatchMessage.h" |
| #include "CCommon.h" |
| #include "CMQException.h" |
| #include "CMessage.h" |
| #include "CSendResult.h" |
| #include "DefaultMQProducer.h" |
| #include "MQClientErrorContainer.h" |
| |
| #ifdef __cplusplus |
| extern "C" { |
| #endif |
| using namespace rocketmq; |
| using namespace std; |
| |
| class SelectMessageQueue : public MessageQueueSelector { |
| public: |
| SelectMessageQueue(QueueSelectorCallback callback) { m_pCallback = callback; } |
| |
| MQMessageQueue select(const std::vector<MQMessageQueue>& mqs, const MQMessage& msg, void* arg) { |
| CMessage* message = (CMessage*)&msg; |
| // Get the index of sending MQMessageQueue through callback function. |
| int index = m_pCallback(mqs.size(), message, arg); |
| return mqs[index]; |
| } |
| |
| private: |
| QueueSelectorCallback m_pCallback; |
| }; |
| |
| class CSendCallback : public AutoDeleteSendCallBack { |
| public: |
| CSendCallback(CSendSuccessCallback cSendSuccessCallback, CSendExceptionCallback cSendExceptionCallback) { |
| m_cSendSuccessCallback = cSendSuccessCallback; |
| m_cSendExceptionCallback = cSendExceptionCallback; |
| } |
| virtual ~CSendCallback() {} |
| virtual void onSuccess(SendResult& sendResult) { |
| CSendResult result; |
| result.sendStatus = CSendStatus((int)sendResult.getSendStatus()); |
| result.offset = sendResult.getQueueOffset(); |
| strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); |
| result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; |
| m_cSendSuccessCallback(result); |
| } |
| virtual void onException(MQException& e) { |
| CMQException exception; |
| exception.error = e.GetError(); |
| exception.line = e.GetLine(); |
| strncpy(exception.msg, e.what(), MAX_EXEPTION_MSG_LENGTH - 1); |
| strncpy(exception.file, e.GetFile(), MAX_EXEPTION_FILE_LENGTH - 1); |
| m_cSendExceptionCallback(exception); |
| } |
| |
| private: |
| CSendSuccessCallback m_cSendSuccessCallback; |
| CSendExceptionCallback m_cSendExceptionCallback; |
| }; |
| |
| CProducer* CreateProducer(const char* groupId) { |
| if (groupId == NULL) { |
| return NULL; |
| } |
| DefaultMQProducer* defaultMQProducer = new DefaultMQProducer(groupId); |
| return (CProducer*)defaultMQProducer; |
| } |
| int DestroyProducer(CProducer* pProducer) { |
| if (pProducer == NULL) { |
| return NULL_POINTER; |
| } |
| delete reinterpret_cast<DefaultMQProducer*>(pProducer); |
| return OK; |
| } |
| int StartProducer(CProducer* producer) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| try { |
| ((DefaultMQProducer*)producer)->start(); |
| } catch (exception& e) { |
| MQClientErrorContainer::setErr(string(e.what())); |
| return PRODUCER_START_FAILED; |
| } |
| return OK; |
| } |
| int ShutdownProducer(CProducer* producer) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->shutdown(); |
| return OK; |
| } |
| int SetProducerNameServerAddress(CProducer* producer, const char* namesrv) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setNamesrvAddr(namesrv); |
| return OK; |
| } |
| int SetProducerNameServerDomain(CProducer* producer, const char* domain) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setNamesrvDomain(domain); |
| return OK; |
| } |
| int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result) { |
| // CSendResult sendResult; |
| if (producer == NULL || msg == NULL || result == NULL) { |
| return NULL_POINTER; |
| } |
| try { |
| DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer; |
| MQMessage* message = (MQMessage*)msg; |
| SendResult sendResult = defaultMQProducer->send(*message); |
| switch (sendResult.getSendStatus()) { |
| case SEND_OK: |
| result->sendStatus = E_SEND_OK; |
| break; |
| case SEND_FLUSH_DISK_TIMEOUT: |
| result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT; |
| break; |
| case SEND_FLUSH_SLAVE_TIMEOUT: |
| result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT; |
| break; |
| case SEND_SLAVE_NOT_AVAILABLE: |
| result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE; |
| break; |
| default: |
| result->sendStatus = E_SEND_OK; |
| break; |
| } |
| result->offset = sendResult.getQueueOffset(); |
| strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); |
| result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; |
| } catch (exception& e) { |
| MQClientErrorContainer::setErr(string(e.what())); |
| return PRODUCER_SEND_SYNC_FAILED; |
| } |
| return OK; |
| } |
| |
| int SendBatchMessage(CProducer* producer, CBatchMessage* batcMsg, CSendResult* result) { |
| // CSendResult sendResult; |
| if (producer == NULL || batcMsg == NULL || result == NULL) { |
| return NULL_POINTER; |
| } |
| try { |
| DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer; |
| vector<MQMessage>* message = (vector<MQMessage>*)batcMsg; |
| SendResult sendResult = defaultMQProducer->send(*message); |
| switch (sendResult.getSendStatus()) { |
| case SEND_OK: |
| result->sendStatus = E_SEND_OK; |
| break; |
| case SEND_FLUSH_DISK_TIMEOUT: |
| result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT; |
| break; |
| case SEND_FLUSH_SLAVE_TIMEOUT: |
| result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT; |
| break; |
| case SEND_SLAVE_NOT_AVAILABLE: |
| result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE; |
| break; |
| default: |
| result->sendStatus = E_SEND_OK; |
| break; |
| } |
| result->offset = sendResult.getQueueOffset(); |
| strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); |
| result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; |
| } catch (exception& e) { |
| return PRODUCER_SEND_SYNC_FAILED; |
| } |
| return OK; |
| } |
| |
| int SendMessageAsync(CProducer* producer, |
| CMessage* msg, |
| CSendSuccessCallback cSendSuccessCallback, |
| CSendExceptionCallback cSendExceptionCallback) { |
| if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) { |
| return NULL_POINTER; |
| } |
| DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer; |
| MQMessage* message = (MQMessage*)msg; |
| CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback, cSendExceptionCallback); |
| |
| try { |
| defaultMQProducer->send(*message, cSendCallback); |
| } catch (exception& e) { |
| if (cSendCallback != NULL) { |
| if (std::type_index(typeid(e)) == std::type_index(typeid(MQException))) { |
| MQException& mqe = (MQException&)e; |
| cSendCallback->onException(mqe); |
| } |
| delete cSendCallback; |
| cSendCallback = NULL; |
| } |
| MQClientErrorContainer::setErr(string(e.what())); |
| return PRODUCER_SEND_ASYNC_FAILED; |
| } |
| return OK; |
| } |
| |
| int SendMessageOneway(CProducer* producer, CMessage* msg) { |
| if (producer == NULL || msg == NULL) { |
| return NULL_POINTER; |
| } |
| DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer; |
| MQMessage* message = (MQMessage*)msg; |
| try { |
| defaultMQProducer->sendOneway(*message); |
| } catch (exception& e) { |
| return PRODUCER_SEND_ONEWAY_FAILED; |
| } |
| return OK; |
| } |
| |
| int SendMessageOnewayOrderly(CProducer* producer, CMessage* msg, QueueSelectorCallback selector, void* arg) { |
| if (producer == NULL || msg == NULL) { |
| return NULL_POINTER; |
| } |
| DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer; |
| MQMessage* message = (MQMessage*)msg; |
| try { |
| SelectMessageQueue selectMessageQueue(selector); |
| defaultMQProducer->sendOneway(*message, &selectMessageQueue, arg); |
| } catch (exception& e) { |
| MQClientErrorContainer::setErr(string(e.what())); |
| return PRODUCER_SEND_ONEWAY_FAILED; |
| } |
| return OK; |
| } |
| |
| int SendMessageOrderlyAsync(CProducer* producer, |
| CMessage* msg, |
| QueueSelectorCallback callback, |
| void* arg, |
| CSendSuccessCallback cSendSuccessCallback, |
| CSendExceptionCallback cSendExceptionCallback) { |
| if (producer == NULL || msg == NULL || callback == NULL || cSendSuccessCallback == NULL || |
| cSendExceptionCallback == NULL) { |
| return NULL_POINTER; |
| } |
| DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer; |
| MQMessage* message = (MQMessage*)msg; |
| CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback, cSendExceptionCallback); |
| |
| try { |
| // Constructing SelectMessageQueue objects through function pointer callback |
| SelectMessageQueue selectMessageQueue(callback); |
| defaultMQProducer->send(*message, &selectMessageQueue, arg, cSendCallback); |
| } catch (exception& e) { |
| printf("%s\n", e.what()); |
| // std::count<<e.what( )<<std::endl; |
| MQClientErrorContainer::setErr(string(e.what())); |
| return PRODUCER_SEND_ORDERLYASYNC_FAILED; |
| } |
| return OK; |
| } |
| |
| int SendMessageOrderly(CProducer* producer, |
| CMessage* msg, |
| QueueSelectorCallback callback, |
| void* arg, |
| int autoRetryTimes, |
| CSendResult* result) { |
| if (producer == NULL || msg == NULL || callback == NULL || arg == NULL || result == NULL) { |
| return NULL_POINTER; |
| } |
| DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer; |
| MQMessage* message = (MQMessage*)msg; |
| try { |
| // Constructing SelectMessageQueue objects through function pointer callback |
| SelectMessageQueue selectMessageQueue(callback); |
| SendResult sendResult = defaultMQProducer->send(*message, &selectMessageQueue, arg, autoRetryTimes); |
| // Convert SendStatus to CSendStatus |
| result->sendStatus = CSendStatus((int)sendResult.getSendStatus()); |
| result->offset = sendResult.getQueueOffset(); |
| strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); |
| result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; |
| } catch (exception& e) { |
| MQClientErrorContainer::setErr(string(e.what())); |
| return PRODUCER_SEND_ORDERLY_FAILED; |
| } |
| return OK; |
| } |
| |
| int SetProducerGroupName(CProducer* producer, const char* groupName) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setGroupName(groupName); |
| return OK; |
| } |
| int SetProducerInstanceName(CProducer* producer, const char* instanceName) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setInstanceName(instanceName); |
| return OK; |
| } |
| int SetProducerSessionCredentials(CProducer* producer, |
| const char* accessKey, |
| const char* secretKey, |
| const char* onsChannel) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setSessionCredentials(accessKey, secretKey, onsChannel); |
| return OK; |
| } |
| int SetProducerLogPath(CProducer* producer, const char* logPath) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| // Todo, This api should be implemented by core api. |
| //((DefaultMQProducer *) producer)->setLogFileSizeAndNum(3, 102400000); |
| return OK; |
| } |
| |
| int SetProducerLogFileNumAndSize(CProducer* producer, int fileNum, long fileSize) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setLogFileSizeAndNum(fileNum, fileSize); |
| return OK; |
| } |
| |
| int SetProducerLogLevel(CProducer* producer, CLogLevel level) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setLogLevel((elogLevel)level); |
| return OK; |
| } |
| |
| int SetProducerSendMsgTimeout(CProducer* producer, int timeout) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setSendMsgTimeout(timeout); |
| return OK; |
| } |
| |
| int SetProducerCompressMsgBodyOverHowmuch(CProducer* producer, int howmuch) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setCompressMsgBodyOverHowmuch(howmuch); |
| return OK; |
| } |
| |
| int SetProducerCompressLevel(CProducer* producer, int level) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setCompressLevel(level); |
| return OK; |
| } |
| |
| int SetProducerMaxMessageSize(CProducer* producer, int size) { |
| if (producer == NULL) { |
| return NULL_POINTER; |
| } |
| ((DefaultMQProducer*)producer)->setMaxMessageSize(size); |
| return OK; |
| } |
| #ifdef __cplusplus |
| }; |
| #endif |