blob: b048ef5a72e694a1ac20a5c9eaaed3fcaac6e6ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CProducer.h"
#include <string.h>
#include <typeindex>
#include <string.h>
#include <typeinfo>
#include "CBatchMessage.h"
#include "CCommon.h"
#include "CMQException.h"
#include "CMessage.h"
#include "CSendResult.h"
#include "ONSFactory.h"
#ifdef __cplusplus
extern "C" {
#endif
using namespace ons;
using namespace std;
class CSendCallback : public SendCallbackONS {
public:
CSendCallback(CSendSuccessCallback cSendSuccessCallback, CSendExceptionCallback cSendExceptionCallback) {
m_cSendSuccessCallback = cSendSuccessCallback;
m_cSendExceptionCallback = cSendExceptionCallback;
}
virtual ~CSendCallback() {}
virtual void onSuccess(SendResultONS &sendResult) {
CSendResult result;
result.sendStatus = E_SEND_OK;
result.offset = 0;
strncpy(result.msgId, sendResult.getMessageId(), MAX_MESSAGE_ID_LENGTH - 1);
result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
if (m_cSendSuccessCallback != NULL) {
m_cSendSuccessCallback(result);
}
}
virtual void onException(ONSClientException &e) {
CMQException exception;
exception.error = e.GetError();
exception.line = e.GetError();
strncpy(exception.msg, e.GetMsg(), MAX_EXEPTION_MSG_LENGTH - 1);
strncpy(exception.file, e.what(), MAX_EXEPTION_FILE_LENGTH - 1);
if (m_cSendExceptionCallback != NULL) {
m_cSendExceptionCallback(exception);
}
}
public:
void setM_cSendSuccessCallback(CSendSuccessCallback callback) {
m_cSendSuccessCallback = callback;
}
void setM_cSendExceptionCallback(CSendExceptionCallback callback) {
m_cSendExceptionCallback = callback;
}
private:
CSendSuccessCallback m_cSendSuccessCallback;
CSendExceptionCallback m_cSendExceptionCallback;
};
typedef struct __DefaultProducer__ {
ONSFactoryProperty factoryInfo;
Producer *innerProducer;
CSendCallback *cSendCallback;
} DefaultProducer;
CProducer *CreateProducer(const char *groupId) {
if (groupId == NULL) {
return NULL;
}
DefaultProducer *defaultMQProducer = new DefaultProducer();
defaultMQProducer->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupId);
defaultMQProducer->factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "AK");
defaultMQProducer->factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "SK");
defaultMQProducer->factoryInfo.setOnsChannel(ONSChannel::LOCAL);
defaultMQProducer->cSendCallback = new CSendCallback(NULL, NULL);
return (CProducer *) defaultMQProducer;
}
int DestroyProducer(CProducer *pProducer) {
if (pProducer == NULL) {
return NULL_POINTER;
}
DefaultProducer *defaultProducer = (DefaultProducer *) pProducer;
if (defaultProducer->cSendCallback != NULL) {
delete defaultProducer->cSendCallback;
}
delete reinterpret_cast<DefaultProducer *>(pProducer);
return OK;
}
int StartProducer(CProducer *producer) {
if (producer == NULL) {
return NULL_POINTER;
}
DefaultProducer *defaultProducer = (DefaultProducer *) producer;
try {
defaultProducer->innerProducer = ONSFactory::getInstance()->createProducer(defaultProducer->factoryInfo);
defaultProducer->innerProducer->start();
} catch (exception &e) {
return PRODUCER_START_FAILED;
}
return OK;
}
int ShutdownProducer(CProducer *producer) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultProducer *) producer)->innerProducer->shutdown();
return OK;
}
int SetProducerNameServerAddress(CProducer *producer, const char *namesrv) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, namesrv);
return OK;
}
int SetProducerNameServerDomain(CProducer *producer, const char *domain) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ONSAddr, domain);
return OK;
}
int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
// CSendResult sendResult;
if (producer == NULL || msg == NULL || result == NULL) {
return NULL_POINTER;
}
try {
DefaultProducer *defaultProducer = (DefaultProducer *) producer;
if (defaultProducer->innerProducer == NULL){
return NULL_POINTER;
}
Message *message = (Message *) msg;
SendResultONS sendResult = defaultProducer->innerProducer->send(*message);
result->sendStatus = E_SEND_OK;
result->offset = 0;
strncpy(result->msgId, sendResult.getMessageId(), MAX_MESSAGE_ID_LENGTH - 1);
result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
} catch (exception &e) {
return PRODUCER_SEND_SYNC_FAILED;
}
return OK;
}
int SendBatchMessage(CProducer *producer, CBatchMessage *batcMsg, CSendResult *result) {
// CSendResult sendResult;
if (producer == NULL || batcMsg == NULL || result == NULL) {
return NULL_POINTER;
}
return NOT_SUPPORT_NOW;
}
int SendMessageAsync(CProducer *producer,
CMessage *msg,
CSendSuccessCallback cSendSuccessCallback,
CSendExceptionCallback cSendExceptionCallback) {
if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) {
return NULL_POINTER;
}
DefaultProducer *defaultProducer = (DefaultProducer *) producer;
if (defaultProducer->innerProducer == NULL){
return NULL_POINTER;
}
Message *message = (Message *) msg;
SendResultONS sendResult = defaultProducer->innerProducer->send(*message);
try {
defaultProducer->cSendCallback->setM_cSendSuccessCallback(cSendSuccessCallback);
defaultProducer->cSendCallback->setM_cSendExceptionCallback(cSendExceptionCallback);
defaultProducer->innerProducer->sendAsync(*message, defaultProducer->cSendCallback);
} catch (exception &e) {
if (defaultProducer->cSendCallback != NULL) {
if (std::type_index(typeid(e)) == std::type_index(typeid(ONSClientException))) {
ONSClientException &mqe = (ONSClientException &) e;
defaultProducer->cSendCallback->onException(mqe);
}
}
return PRODUCER_SEND_ASYNC_FAILED;
}
return OK;
}
int SendMessageOneway(CProducer *producer, CMessage *msg) {
if (producer == NULL || msg == NULL) {
return NULL_POINTER;
}
DefaultProducer *defaultProducer = (DefaultProducer *) producer;
if (defaultProducer->innerProducer == NULL){
return NULL_POINTER;
}
Message *message = (Message *) msg;
try {
defaultProducer->innerProducer->sendOneway(*message);
} catch (exception &e) {
return PRODUCER_SEND_ONEWAY_FAILED;
}
return OK;
}
int SendMessageOnewayOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback selector, void *arg) {
if (producer == NULL || msg == NULL) {
return NULL_POINTER;
}
DefaultProducer *defaultProducer = (DefaultProducer *) producer;
if (defaultProducer->innerProducer == NULL){
return NULL_POINTER;
}
Message *message = (Message *) msg;
try {
//defaultProducer->innerProducer->sendOneway(*message);
} catch (exception &e) {
return PRODUCER_SEND_ONEWAY_FAILED;
}
return NOT_SUPPORT_NOW;
}
int SendMessageOrderlyAsync(CProducer *producer,
CMessage *msg,
QueueSelectorCallback callback,
void *arg,
CSendSuccessCallback cSendSuccessCallback,
CSendExceptionCallback cSendExceptionCallback) {
if (producer == NULL || msg == NULL || callback == NULL || cSendSuccessCallback == NULL ||
cSendExceptionCallback == NULL) {
return NULL_POINTER;
}
return NOT_SUPPORT_NOW;
}
int SendMessageOrderly(CProducer *producer,
CMessage *msg,
QueueSelectorCallback callback,
void *arg,
int autoRetryTimes,
CSendResult *result) {
if (producer == NULL || msg == NULL || callback == NULL || arg == NULL || result == NULL) {
return NULL_POINTER;
}
return NOT_SUPPORT_NOW;
}
int SetProducerGroupName(CProducer *producer, const char *groupName) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupName);
return OK;
}
int SetProducerInstanceName(CProducer *producer, const char *instanceName) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumeThreadNums, instanceName);
return OK;
}
int SetProducerSessionCredentials(CProducer *producer,
const char *accessKey,
const char *secretKey,
const char *onsChannel) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, accessKey);
((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, secretKey);
return OK;
}
int SetProducerLogPath(CProducer *producer, const char *logPath) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::LogPath, logPath);
return OK;
}
int SetProducerLogFileNumAndSize(CProducer *producer, int fileNum, long fileSize) {
if (producer == NULL) {
return NULL_POINTER;
}
return OK;
}
int SetProducerLogLevel(CProducer *producer, CLogLevel level) {
if (producer == NULL) {
return NULL_POINTER;
}
//((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::LogPath, level);
return OK;
}
int SetProducerSendMsgTimeout(CProducer *producer, int timeout) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultProducer *) producer)->factoryInfo.setSendMsgTimeout(timeout);
return OK;
}
int SetProducerCompressMsgBodyOverHowmuch(CProducer *producer, int howmuch) {
if (producer == NULL) {
return NULL_POINTER;
}
return OK;
}
int SetProducerCompressLevel(CProducer *producer, int level) {
if (producer == NULL) {
return NULL_POINTER;
}
return OK;
}
int SetProducerMaxMessageSize(CProducer *producer, int size) {
if (producer == NULL) {
return NULL_POINTER;
}
return OK;
}
#ifdef __cplusplus
};
#endif