blob: 2c35c7491763de77026bfd0eecefa9e42a6eaceb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DefaultMQPushConsumer.h"
#include "CMessageExt.h"
#include "CPushConsumer.h"
#include "CCommon.h"
#include <map>
using namespace rocketmq;
using namespace std;
class MessageListenerInner : public MessageListenerConcurrently {
public:
MessageListenerInner() {}
MessageListenerInner(CPushConsumer *consumer, MessageCallBack pCallback) {
m_pconsumer = consumer;
m_pMsgReceiveCallback = pCallback;
}
~MessageListenerInner() {}
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
//to do user call back
if (m_pMsgReceiveCallback == NULL) {
return RECONSUME_LATER;
}
for (size_t i = 0; i < msgs.size(); ++i) {
MQMessageExt *msg = const_cast<MQMessageExt *>(&msgs[i]);
CMessageExt *message = (CMessageExt *) (msg);
if (m_pMsgReceiveCallback(m_pconsumer, message) != E_CONSUME_SUCCESS)
return RECONSUME_LATER;
}
return CONSUME_SUCCESS;
}
private:
MessageCallBack m_pMsgReceiveCallback;
CPushConsumer *m_pconsumer;
};
class MessageListenerOrderlyInner : public MessageListenerOrderly {
public:
MessageListenerOrderlyInner(CPushConsumer *consumer, MessageCallBack pCallback) {
m_pconsumer = consumer;
m_pMsgReceiveCallback = pCallback;
}
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
if (m_pMsgReceiveCallback == NULL) {
return RECONSUME_LATER;
}
for (size_t i = 0; i < msgs.size(); ++i) {
MQMessageExt *msg = const_cast<MQMessageExt *>(&msgs[i]);
CMessageExt *message = (CMessageExt *) (msg);
if (m_pMsgReceiveCallback(m_pconsumer, message) != E_CONSUME_SUCCESS)
return RECONSUME_LATER;
}
return CONSUME_SUCCESS;
}
private:
MessageCallBack m_pMsgReceiveCallback;
CPushConsumer *m_pconsumer;
};
map<CPushConsumer *, MessageListenerInner *> g_ListenerMap;
map<CPushConsumer *, MessageListenerOrderlyInner *> g_OrderListenerMap;
#ifdef __cplusplus
extern "C" {
#endif
CPushConsumer *CreatePushConsumer(const char *groupId) {
if (groupId == NULL) {
return NULL;
}
DefaultMQPushConsumer *defaultMQPushConsumer = new DefaultMQPushConsumer(groupId);
defaultMQPushConsumer->setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);
return (CPushConsumer *) defaultMQPushConsumer;
}
int DestroyPushConsumer(CPushConsumer *consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
delete reinterpret_cast<DefaultMQPushConsumer * >(consumer);
return OK;
}
int StartPushConsumer(CPushConsumer *consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
try {
((DefaultMQPushConsumer *) consumer)->start();
} catch (exception &e) {
return PUSHCONSUMER_START_FAILED;
}
return OK;
}
int ShutdownPushConsumer(CPushConsumer *consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->shutdown();
return OK;
}
int SetPushConsumerGroupID(CPushConsumer *consumer, const char *groupId) {
if (consumer == NULL || groupId == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setGroupName(groupId);
return OK;
}
const char *GetPushConsumerGroupID(CPushConsumer *consumer) {
if (consumer == NULL) {
return NULL;
}
return ((DefaultMQPushConsumer *) consumer)->getGroupName().c_str();
}
int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesrv) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setNamesrvAddr(namesrv);
return OK;
}
int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char *domain) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setNamesrvDomain(domain);
return OK;
}
int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->subscribe(topic, expression);
return OK;
}
int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback) {
if (consumer == NULL || pCallback == NULL) {
return NULL_POINTER;
}
MessageListenerInner *listenerInner = new MessageListenerInner(consumer, pCallback);
((DefaultMQPushConsumer *) consumer)->registerMessageListener(listenerInner);
g_ListenerMap[consumer] = listenerInner;
return OK;
}
int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCallback) {
if (consumer == NULL || pCallback == NULL) {
return NULL_POINTER;
}
MessageListenerOrderlyInner *messageListenerOrderlyInner = new MessageListenerOrderlyInner(consumer, pCallback);
((DefaultMQPushConsumer *) consumer)->registerMessageListener(messageListenerOrderlyInner);
g_OrderListenerMap[consumer] = messageListenerOrderlyInner;
return OK;
}
int UnregisterMessageCallbackOrderly(CPushConsumer *consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
map<CPushConsumer *, MessageListenerOrderlyInner *>::iterator iter;
iter = g_OrderListenerMap.find(consumer);
if (iter != g_OrderListenerMap.end()) {
MessageListenerOrderlyInner *listenerInner = iter->second;
if (listenerInner != NULL) {
delete listenerInner;
}
g_OrderListenerMap.erase(iter);
}
return OK;
}
int UnregisterMessageCallback(CPushConsumer *consumer) {
if (consumer == NULL) {
return NULL_POINTER;
}
map<CPushConsumer *, MessageListenerInner *>::iterator iter;
iter = g_ListenerMap.find(consumer);
if (iter != g_ListenerMap.end()) {
MessageListenerInner *listenerInner = iter->second;
if (listenerInner != NULL) {
delete listenerInner;
}
g_ListenerMap.erase(iter);
}
return OK;
}
int SetPushConsumerMessageModel(CPushConsumer *consumer, CMessageModel messageModel) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setMessageModel(MessageModel((int) messageModel));
return OK;
}
int SetPushConsumerThreadCount(CPushConsumer *consumer, int threadCount) {
if (consumer == NULL || threadCount == 0) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setConsumeThreadCount(threadCount);
return OK;
}
int SetPushConsumerMessageBatchMaxSize(CPushConsumer *consumer, int batchSize) {
if (consumer == NULL || batchSize == 0) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setConsumeMessageBatchMaxSize(batchSize);
return OK;
}
int SetPushConsumerInstanceName(CPushConsumer *consumer, const char *instanceName) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setInstanceName(instanceName);
return OK;
}
int SetPushConsumerSessionCredentials(CPushConsumer *consumer, const char *accessKey, const char *secretKey,
const char *channel) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setSessionCredentials(accessKey, secretKey, channel);
return OK;
}
int SetPushConsumerLogPath(CPushConsumer *consumer, const char *logPath) {
if (consumer == NULL) {
return NULL_POINTER;
}
//Todo, This api should be implemented by core api.
//((DefaultMQPushConsumer *) consumer)->setInstanceName(instanceName);
return OK;
}
int SetPushConsumerLogFileNumAndSize(CPushConsumer *consumer, int fileNum, long fileSize) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setLogFileSizeAndNum(fileNum, fileSize);
return OK;
}
int SetPushConsumerLogLevel(CPushConsumer *consumer, CLogLevel level) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPushConsumer *) consumer)->setLogLevel((elogLevel) level);
return OK;
}
#ifdef __cplusplus
};
#endif