blob: 198c11e7eb66d3226e8af2f245fde73ddb06131d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CPullConsumer.h"
#include "CCommon.h"
#include "CMessageExt.h"
#include "DefaultMQPullConsumer.h"
#include "MQClientErrorContainer.h"
using namespace rocketmq;
using namespace std;
#ifdef __cplusplus
extern "C" {
#endif
CPullConsumer* CreatePullConsumer(const char* groupId) {
if (groupId == NULL) {
return NULL;
}
DefaultMQPullConsumer* defaultMQPullConsumer = new DefaultMQPullConsumer(groupId);
return (CPullConsumer*)defaultMQPullConsumer;
}
int DestroyPullConsumer(CPullConsumer* consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
delete reinterpret_cast<DefaultMQPullConsumer*>(consumer);
return OK;
}
int StartPullConsumer(CPullConsumer* consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
try {
((DefaultMQPullConsumer*)consumer)->start();
} catch (exception& e) {
MQClientErrorContainer::setErr(string(e.what()));
return PULLCONSUMER_START_FAILED;
}
return OK;
}
int ShutdownPullConsumer(CPullConsumer* consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer*)consumer)->shutdown();
return OK;
}
int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId) {
if (consumer == NULL || groupId == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer*)consumer)->setGroupName(groupId);
return OK;
}
const char* GetPullConsumerGroupID(CPullConsumer* consumer) {
if (consumer == NULL) {
return NULL;
}
return ((DefaultMQPullConsumer*)consumer)->getGroupName().c_str();
}
int SetPullConsumerNameServerAddress(CPullConsumer* consumer, const char* namesrv) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer*)consumer)->setNamesrvAddr(namesrv);
return OK;
}
int SetPullConsumerNameServerDomain(CPullConsumer* consumer, const char* domain) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer*)consumer)->setNamesrvDomain(domain);
return OK;
}
int SetPullConsumerSessionCredentials(CPullConsumer* consumer,
const char* accessKey,
const char* secretKey,
const char* channel) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer*)consumer)->setSessionCredentials(accessKey, secretKey, channel);
return OK;
}
int SetPullConsumerLogPath(CPullConsumer* consumer, const char* logPath) {
if (consumer == NULL) {
return NULL_POINTER;
}
// Todo, This api should be implemented by core api.
//((DefaultMQPullConsumer *) consumer)->setInstanceName(instanceName);
return OK;
}
int SetPullConsumerLogFileNumAndSize(CPullConsumer* consumer, int fileNum, long fileSize) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer*)consumer)->setLogFileSizeAndNum(fileNum, fileSize);
return OK;
}
int SetPullConsumerLogLevel(CPullConsumer* consumer, CLogLevel level) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer*)consumer)->setLogLevel((elogLevel)level);
return OK;
}
int FetchSubscriptionMessageQueues(CPullConsumer* consumer, const char* topic, CMessageQueue** mqs, int* size) {
if (consumer == NULL) {
return NULL_POINTER;
}
unsigned int index = 0;
CMessageQueue* temMQ = NULL;
std::vector<MQMessageQueue> fullMQ;
try {
((DefaultMQPullConsumer*)consumer)->fetchSubscribeMessageQueues(topic, fullMQ);
*size = fullMQ.size();
// Alloc memory to save the pointer to CPP MessageQueue, and the MessageQueues may be changed.
// Thus, this memory should be released by users using @ReleaseSubscribeMessageQueue every time.
temMQ = (CMessageQueue*)malloc(*size * sizeof(CMessageQueue));
if (temMQ == NULL) {
*size = 0;
*mqs = NULL;
return MALLOC_FAILED;
}
auto iter = fullMQ.begin();
for (index = 0; iter != fullMQ.end() && index <= fullMQ.size(); ++iter, index++) {
strncpy(temMQ[index].topic, iter->getTopic().c_str(), MAX_TOPIC_LENGTH - 1);
strncpy(temMQ[index].brokerName, iter->getBrokerName().c_str(), MAX_BROKER_NAME_ID_LENGTH - 1);
temMQ[index].queueId = iter->getQueueId();
}
*mqs = temMQ;
} catch (MQException& e) {
*size = 0;
*mqs = NULL;
MQClientErrorContainer::setErr(string(e.what()));
return PULLCONSUMER_FETCH_MQ_FAILED;
}
return OK;
}
int ReleaseSubscriptionMessageQueue(CMessageQueue* mqs) {
if (mqs == NULL) {
return NULL_POINTER;
}
free((void*)mqs);
mqs = NULL;
return OK;
}
CPullResult Pull(CPullConsumer* consumer,
const CMessageQueue* mq,
const char* subExpression,
long long offset,
int maxNums) {
CPullResult pullResult;
memset(&pullResult, 0, sizeof(CPullResult));
MQMessageQueue messageQueue(mq->topic, mq->brokerName, mq->queueId);
PullResult cppPullResult;
try {
cppPullResult = ((DefaultMQPullConsumer*)consumer)->pull(messageQueue, subExpression, offset, maxNums);
} catch (exception& e) {
MQClientErrorContainer::setErr(string(e.what()));
cppPullResult.pullStatus = BROKER_TIMEOUT;
}
if (cppPullResult.pullStatus != BROKER_TIMEOUT) {
pullResult.maxOffset = cppPullResult.maxOffset;
pullResult.minOffset = cppPullResult.minOffset;
pullResult.nextBeginOffset = cppPullResult.nextBeginOffset;
}
switch (cppPullResult.pullStatus) {
case FOUND: {
pullResult.pullStatus = E_FOUND;
pullResult.size = cppPullResult.msgFoundList.size();
PullResult* tmpPullResult = new PullResult(cppPullResult);
pullResult.pData = tmpPullResult;
// Alloc memory to save the pointer to CPP MQMessageExt, which will be release by the CPP SDK core.
// Thus, this memory should be released by users using @ReleasePullResult
pullResult.msgFoundList = (CMessageExt**)malloc(pullResult.size * sizeof(CMessageExt*));
for (size_t i = 0; i < cppPullResult.msgFoundList.size(); i++) {
MQMessageExt* msg = const_cast<MQMessageExt*>(&tmpPullResult->msgFoundList[i]);
pullResult.msgFoundList[i] = (CMessageExt*)(msg);
}
break;
}
case NO_NEW_MSG: {
pullResult.pullStatus = E_NO_NEW_MSG;
break;
}
case NO_MATCHED_MSG: {
pullResult.pullStatus = E_NO_MATCHED_MSG;
break;
}
case OFFSET_ILLEGAL: {
pullResult.pullStatus = E_OFFSET_ILLEGAL;
break;
}
case BROKER_TIMEOUT: {
pullResult.pullStatus = E_BROKER_TIMEOUT;
break;
}
default:
pullResult.pullStatus = E_NO_NEW_MSG;
break;
}
return pullResult;
}
int ReleasePullResult(CPullResult pullResult) {
if (pullResult.size == 0 || pullResult.msgFoundList == NULL || pullResult.pData == NULL) {
return NULL_POINTER;
}
if (pullResult.pData != NULL) {
try {
delete ((PullResult*)pullResult.pData);
} catch (exception& e) {
MQClientErrorContainer::setErr(string(e.what()));
return NULL_POINTER;
}
}
free((void*)pullResult.msgFoundList);
pullResult.msgFoundList = NULL;
return OK;
}
#ifdef __cplusplus
};
#endif