| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| #if !defined(WIN32) && !defined(__APPLE__) | |
| #include <sys/prctl.h> | |
| #endif | |
| #include "ConsumeMsgService.h" | |
| #include "DefaultMQPushConsumer.h" | |
| #include "Logging.h" | |
| #include "MessageAccessor.h" | |
| #include "StatsServerManager.h" | |
| #include "UtilAll.h" | |
| namespace rocketmq { | |
| //<!************************************************************************ | |
| ConsumeMessageConcurrentlyService::ConsumeMessageConcurrentlyService(MQConsumer* consumer, | |
| int threadCount, | |
| MQMessageListener* msgListener) | |
| : m_pConsumer(consumer), m_pMessageListener(msgListener), m_ioServiceWork(m_ioService) { | |
| #if !defined(WIN32) && !defined(__APPLE__) | |
| string taskName = UtilAll::getProcessName(); | |
| prctl(PR_SET_NAME, "ConsumeTP", 0, 0, 0); | |
| #endif | |
| for (int i = 0; i != threadCount; ++i) { | |
| m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService)); | |
| } | |
| #if !defined(WIN32) && !defined(__APPLE__) | |
| prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0); | |
| #endif | |
| } | |
| ConsumeMessageConcurrentlyService::~ConsumeMessageConcurrentlyService(void) { | |
| m_pConsumer = NULL; | |
| m_pMessageListener = NULL; | |
| } | |
| void ConsumeMessageConcurrentlyService::start() {} | |
| void ConsumeMessageConcurrentlyService::shutdown() { | |
| stopThreadPool(); | |
| } | |
| void ConsumeMessageConcurrentlyService::stopThreadPool() { | |
| m_ioService.stop(); | |
| m_threadpool.join_all(); | |
| } | |
| MessageListenerType ConsumeMessageConcurrentlyService::getConsumeMsgSerivceListenerType() { | |
| return m_pMessageListener->getMessageListenerType(); | |
| } | |
| void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<PullRequest> pullRequest, | |
| vector<MQMessageExt>& msgs) { | |
| boost::shared_ptr<PullRequest> request = pullRequest.lock(); | |
| if (!request) { | |
| LOG_WARN("Pull request has been released"); | |
| return; | |
| } | |
| if (request->isDropped()) { | |
| LOG_INFO("Pull request for %s is dropped, which will be released in next re-balance.", | |
| request->m_messageQueue.toString().c_str()); | |
| return; | |
| } | |
| if (!request->isDropped() && !m_ioService.stopped()) { | |
| m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest, this, request, msgs)); | |
| } else { | |
| LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post ConsumeRequest.", | |
| request->m_messageQueue.toString().c_str()); | |
| } | |
| } | |
| void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest> pullRequest, | |
| vector<MQMessageExt>& msgs, | |
| int millis) { | |
| if (msgs.empty()) { | |
| return; | |
| } | |
| boost::shared_ptr<PullRequest> request = pullRequest.lock(); | |
| if (!request) { | |
| LOG_WARN("Pull request has been released"); | |
| return; | |
| } | |
| if (request->isDropped()) { | |
| LOG_INFO("Pull request is set as dropped with mq:%s, need release in next rebalance.", | |
| (request->m_messageQueue).toString().c_str()); | |
| return; | |
| } | |
| if (!request->isDropped() && !m_ioService.stopped()) { | |
| boost::asio::deadline_timer* t = | |
| new boost::asio::deadline_timer(m_ioService, boost::posix_time::milliseconds(millis)); | |
| t->async_wait( | |
| boost::bind(&(ConsumeMessageConcurrentlyService::static_submitConsumeRequest), this, t, request, msgs)); | |
| LOG_INFO("Submit Message to Consumer [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(), | |
| millis); | |
| } else { | |
| LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post delay ConsumeRequest.", | |
| request->m_messageQueue.toString().c_str()); | |
| } | |
| } | |
| void ConsumeMessageConcurrentlyService::static_submitConsumeRequest(void* context, | |
| boost::asio::deadline_timer* t, | |
| boost::weak_ptr<PullRequest> pullRequest, | |
| vector<MQMessageExt>& msgs) { | |
| boost::shared_ptr<PullRequest> request = pullRequest.lock(); | |
| if (!request) { | |
| LOG_WARN("Pull request has been released"); | |
| return; | |
| } | |
| ConsumeMessageConcurrentlyService* pService = (ConsumeMessageConcurrentlyService*)context; | |
| if (pService) { | |
| pService->triggersubmitConsumeRequestLater(t, request, msgs); | |
| } | |
| } | |
| void ConsumeMessageConcurrentlyService::triggersubmitConsumeRequestLater(boost::asio::deadline_timer* t, | |
| boost::weak_ptr<PullRequest> pullRequest, | |
| vector<MQMessageExt>& msgs) { | |
| boost::shared_ptr<PullRequest> request = pullRequest.lock(); | |
| if (!request) { | |
| LOG_WARN("Pull request has been released"); | |
| return; | |
| } | |
| submitConsumeRequest(request, msgs); | |
| deleteAndZero(t); | |
| } | |
| void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest, | |
| vector<MQMessageExt>& msgs) { | |
| boost::shared_ptr<PullRequest> request = pullRequest.lock(); | |
| if (!request) { | |
| LOG_WARN("Pull request has been released"); | |
| return; | |
| } | |
| if (request->isDropped()) { | |
| LOG_WARN("the pull request for %s Had been dropped before", request->m_messageQueue.toString().c_str()); | |
| request->clearAllMsgs(); // add clear operation to avoid bad state when | |
| // dropped pullRequest returns normal | |
| return; | |
| } | |
| if (msgs.empty()) { | |
| LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str()); | |
| return; | |
| } | |
| ConsumeMessageContext consumeMessageContext; | |
| DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer); | |
| std::string groupName = pConsumer->getGroupName(); | |
| if (pConsumer) { | |
| if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) { | |
| consumeMessageContext.setDefaultMQPushConsumer(pConsumer); | |
| consumeMessageContext.setConsumerGroup(groupName); | |
| consumeMessageContext.setMessageQueue(request->m_messageQueue); | |
| consumeMessageContext.setMsgList(msgs); | |
| consumeMessageContext.setSuccess(false); | |
| consumeMessageContext.setNameSpace(pConsumer->getNameSpace()); | |
| pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext); | |
| } | |
| } | |
| ConsumeStatus status = CONSUME_SUCCESS; | |
| if (m_pMessageListener != NULL) { | |
| resetRetryTopic(msgs); | |
| request->setLastConsumeTimestamp(UtilAll::currentTimeMillis()); | |
| LOG_DEBUG("=====Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]", msgs[0].getTopic().c_str(), | |
| msgs[0].getMsgId().c_str(), msgs[0].getBody().c_str(), msgs[0].getReconsumeTimes()); | |
| if (m_pConsumer->isUseNameSpaceMode()) { | |
| MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace()); | |
| } | |
| if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) { | |
| // For open trace message, consume message one by one. | |
| for (size_t i = 0; i < msgs.size(); ++i) { | |
| LOG_DEBUG("=====Trace Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]", | |
| msgs[i].getTopic().c_str(), msgs[i].getMsgId().c_str(), msgs[i].getBody().c_str(), | |
| msgs[i].getReconsumeTimes()); | |
| std::vector<MQMessageExt> msgInner; | |
| msgInner.push_back(msgs[i]); | |
| if (status != CONSUME_SUCCESS) { | |
| // all the Messages behind should be set to failed. | |
| status = RECONSUME_LATER; | |
| consumeMessageContext.setMsgIndex(i); | |
| consumeMessageContext.setStatus("RECONSUME_LATER"); | |
| consumeMessageContext.setSuccess(false); | |
| pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext); | |
| continue; | |
| } | |
| uint64 startTimeStamp = UtilAll::currentTimeMillis(); | |
| try { | |
| status = m_pMessageListener->consumeMessage(msgInner); | |
| } catch (...) { | |
| status = RECONSUME_LATER; | |
| LOG_ERROR("Consumer's code is buggy. Un-caught exception raised"); | |
| } | |
| uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp; | |
| StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(), | |
| groupName, consumerRT); | |
| consumeMessageContext.setMsgIndex(i); // indicate message position,not support batch consumer | |
| if (status == CONSUME_SUCCESS) { | |
| consumeMessageContext.setStatus("CONSUME_SUCCESS"); | |
| consumeMessageContext.setSuccess(true); | |
| } else { | |
| status = RECONSUME_LATER; | |
| consumeMessageContext.setStatus("RECONSUME_LATER"); | |
| consumeMessageContext.setSuccess(false); | |
| } | |
| pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext); | |
| } | |
| } else { | |
| uint64 startTimeStamp = UtilAll::currentTimeMillis(); | |
| try { | |
| status = m_pMessageListener->consumeMessage(msgs); | |
| } catch (...) { | |
| status = RECONSUME_LATER; | |
| LOG_ERROR("Consumer's code is buggy. Un-caught exception raised"); | |
| } | |
| uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp; | |
| StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(), | |
| groupName, consumerRT, msgs.size()); | |
| } | |
| } | |
| int ackIndex = -1; | |
| switch (status) { | |
| case CONSUME_SUCCESS: | |
| ackIndex = msgs.size(); | |
| break; | |
| case RECONSUME_LATER: | |
| ackIndex = -1; | |
| break; | |
| default: | |
| break; | |
| } | |
| std::vector<MQMessageExt> localRetryMsgs; | |
| switch (m_pConsumer->getMessageModel()) { | |
| case BROADCASTING: { | |
| // Note: broadcasting reconsume should do by application, as it has big | |
| // affect to broker cluster | |
| if (ackIndex != (int)msgs.size()) | |
| LOG_WARN("BROADCASTING, the message consume failed, drop it:%s", (request->m_messageQueue).toString().c_str()); | |
| break; | |
| } | |
| case CLUSTERING: { | |
| // status consumer tps | |
| if (ackIndex == -1) { | |
| StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS( | |
| request->m_messageQueue.getTopic(), groupName, msgs.size()); | |
| } else { | |
| StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(request->m_messageQueue.getTopic(), | |
| groupName, msgs.size()); | |
| } | |
| // send back msg to broker; | |
| for (size_t i = ackIndex + 1; i < msgs.size(); i++) { | |
| LOG_DEBUG("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume times is:%d", | |
| (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i, | |
| msgs[i].getReconsumeTimes()); | |
| if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) { | |
| string brokerName = request->m_messageQueue.getBrokerName(); | |
| if (m_pConsumer->isUseNameSpaceMode()) { | |
| MessageAccessor::withNameSpace(msgs[i], m_pConsumer->getNameSpace()); | |
| } | |
| if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) { | |
| LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d", | |
| (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i, | |
| msgs[i].getReconsumeTimes()); | |
| msgs[i].setReconsumeTimes(msgs[i].getReconsumeTimes() + 1); | |
| localRetryMsgs.push_back(msgs[i]); | |
| } | |
| } | |
| } | |
| break; | |
| } | |
| default: | |
| break; | |
| } | |
| if (!localRetryMsgs.empty()) { | |
| LOG_ERROR("Client side re-consume launched due to both message consuming and SDK send-back retry failure"); | |
| for (std::vector<MQMessageExt>::iterator itOrigin = msgs.begin(); itOrigin != msgs.end();) { | |
| bool remove = false; | |
| for (std::vector<MQMessageExt>::iterator itRetry = localRetryMsgs.begin(); itRetry != localRetryMsgs.end(); | |
| itRetry++) { | |
| if (itRetry->getQueueOffset() == itOrigin->getQueueOffset()) { | |
| remove = true; | |
| break; | |
| } | |
| } | |
| if (remove) { | |
| itOrigin = msgs.erase(itOrigin); | |
| } else { | |
| itOrigin++; | |
| } | |
| } | |
| } | |
| // update offset | |
| int64 offset = request->removeMessage(msgs); | |
| if (offset >= 0) { | |
| m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset); | |
| } else { | |
| LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..", | |
| (request->m_messageQueue).toString().c_str()); | |
| } | |
| if (!localRetryMsgs.empty()) { | |
| // submitConsumeRequest(request, localTryMsgs); | |
| LOG_INFO("Send [%d ]messages back to mq:%s failed, call reconsume again after 1s.", localRetryMsgs.size(), | |
| (request->m_messageQueue).toString().c_str()); | |
| submitConsumeRequestLater(request, localRetryMsgs, 1000); | |
| } | |
| } // namespace rocketmq | |
| void ConsumeMessageConcurrentlyService::resetRetryTopic(vector<MQMessageExt>& msgs) { | |
| string groupTopic = UtilAll::getRetryTopic(m_pConsumer->getGroupName()); | |
| for (size_t i = 0; i < msgs.size(); i++) { | |
| MQMessageExt& msg = msgs[i]; | |
| string retryTopic = msg.getProperty(MQMessage::PROPERTY_RETRY_TOPIC); | |
| if (!retryTopic.empty() && groupTopic.compare(msg.getTopic()) == 0) { | |
| msg.setTopic(retryTopic); | |
| } | |
| } | |
| } | |
| //<!*************************************************************************** | |
| } // namespace rocketmq |