blob: 0ac30637943ca28778d2dd3517c0f1ea7cf82097 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "StatsServer.h"
#include <memory>
#include <string>
#include "Logging.h"
#include "RocketMQClient.h"
#include "StatsItem.h"
namespace rocketmq {
const std::string StatsServer::TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
const std::string StatsServer::TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
const std::string StatsServer::TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT";
const std::string StatsServer::TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS";
const std::string StatsServer::TOPIC_AND_GROUP_PULL_RT = "PULL_RT";
const int StatsServer::SAMPLING_PERIOD = 10;
StatsServer::StatsServer() {
m_status = CREATE_JUST;
serverName = "default";
}
StatsServer::StatsServer(std::string sName) {
m_status = CREATE_JUST;
serverName = sName;
}
StatsServer::~StatsServer() {}
void StatsServer::start() {
switch (m_status) {
case CREATE_JUST:
m_status = START_FAILED;
startScheduledTask();
LOG_INFO("Default Status Service Start.");
m_status = RUNNING;
break;
case RUNNING:
case SHUTDOWN_ALREADY:
case START_FAILED:
break;
default:
break;
}
}
void StatsServer::shutdown() {
switch (m_status) {
case CREATE_JUST:
case RUNNING:
LOG_INFO("Default Status Service ShutDown.");
stopScheduledTask();
m_status = SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
case START_FAILED:
break;
default:
break;
}
}
ConsumeStats StatsServer::getConsumeStats(std::string topic, std::string groupName) {
ConsumeStats consumeStats;
LOG_DEBUG("getConsumeStats Topic:%s, Group:%s", topic.c_str(), groupName.c_str());
if (m_status == RUNNING) {
std::string key = topic + "@" + groupName;
LOG_DEBUG("getConsumeStats Key:%s", key.c_str());
return getConsumeStats(key);
}
return consumeStats;
}
void StatsServer::incPullRT(std::string topic, std::string groupName, uint64 rt) {
std::string key = topic + "@" + groupName;
std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
StatsItem item;
m_consumeStatsItems[key] = item;
}
m_consumeStatsItems[key].pullRT += rt;
m_consumeStatsItems[key].pullRTCount += 1;
}
void StatsServer::incPullTPS(std::string topic, std::string groupName, uint64 msgCount) {
std::string key = topic + "@" + groupName;
std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
StatsItem item;
m_consumeStatsItems[key] = item;
}
m_consumeStatsItems[key].pullCount += msgCount;
}
void StatsServer::incConsumeRT(std::string topic, std::string groupName, uint64 rt, uint64 msgCount) {
std::string key = topic + "@" + groupName;
LOG_DEBUG("incConsumeRT before Key:%s, RT:%lld, Count: %lld", key.c_str(), rt, msgCount);
std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
StatsItem item;
m_consumeStatsItems[key] = item;
}
m_consumeStatsItems[key].consumeRT += rt;
m_consumeStatsItems[key].consumeRTCount += msgCount;
LOG_DEBUG("incConsumeRT After Key:%s, RT:%lld, Count: %lld", key.c_str(), m_consumeStatsItems[key].consumeRT,
m_consumeStatsItems[key].consumeRTCount);
}
void StatsServer::incConsumeOKTPS(std::string topic, std::string groupName, uint64 msgCount) {
std::string key = topic + "@" + groupName;
LOG_DEBUG("incConsumeOKTPS Before Key:%s, Count: %lld", key.c_str(), msgCount);
std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
StatsItem item;
m_consumeStatsItems[key] = item;
}
m_consumeStatsItems[key].consumeOKCount += msgCount;
LOG_DEBUG("incConsumeOKTPS After Key:%s, Count: %lld", key.c_str(), m_consumeStatsItems[key].consumeOKCount);
}
void StatsServer::incConsumeFailedTPS(std::string topic, std::string groupName, uint64 msgCount) {
std::string key = topic + "@" + groupName;
LOG_DEBUG("incConsumeFailedTPS Key:%s, Count: %lld", key.c_str(), msgCount);
std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
StatsItem item;
m_consumeStatsItems[key] = item;
}
m_consumeStatsItems[key].consumeFailedCount += msgCount;
m_consumeStatsItems[key].consumeFailedMsgs += msgCount;
}
void StatsServer::incConsumeFailedMsgs(std::string topic, std::string groupName, uint64 msgCount) {
std::string key = topic + "@" + groupName;
LOG_DEBUG("incConsumeFailedTPS Key:%s, Count: %lld", key.c_str(), msgCount);
std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
StatsItem item;
m_consumeStatsItems[key] = item;
}
m_consumeStatsItems[key].consumeFailedMsgs += msgCount;
}
void StatsServer::startScheduledTask() {
m_consumer_status_service_thread.reset(new boost::thread(boost::bind(&StatsServer::doStartScheduledTask, this)));
}
void StatsServer::stopScheduledTask() {
if (m_consumer_status_service_thread) {
m_consumerStatus_ioService.stop();
m_consumer_status_service_thread->interrupt();
m_consumer_status_service_thread->join();
m_consumer_status_service_thread.reset();
}
}
void StatsServer::doStartScheduledTask() {
boost::asio::io_service::work work(m_consumerStatus_ioService);
boost::system::error_code ec1;
std::shared_ptr<boost::asio::deadline_timer> t1 = std::make_shared<boost::asio::deadline_timer>(
m_consumerStatus_ioService, boost::posix_time::seconds(SAMPLING_PERIOD));
t1->async_wait(boost::bind(&StatsServer::scheduledTaskInSeconds, this, ec1, t1));
boost::system::error_code errorCode;
m_consumerStatus_ioService.run(errorCode);
}
void StatsServer::scheduledTaskInSeconds(boost::system::error_code& ec,
std::shared_ptr<boost::asio::deadline_timer> t) {
samplingInSeconds();
boost::system::error_code e;
t->expires_from_now(t->expires_from_now() + boost::posix_time::seconds(SAMPLING_PERIOD), e);
t->async_wait(boost::bind(&StatsServer::scheduledTaskInSeconds, this, ec, t));
}
void StatsServer::samplingInSeconds() {
LOG_DEBUG("samplingInSeconds==");
// do samplings
std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
for (std::map<std::string, StatsItem>::iterator it = m_consumeStatsItems.begin(); it != m_consumeStatsItems.end();
++it) {
ConsumeStats consumeStats;
if (it->second.pullRTCount != 0) {
consumeStats.pullRT = (1.0 * (it->second.pullRT)) / (it->second.pullRTCount);
}
it->second.pullRT = 0;
it->second.pullRTCount = 0;
consumeStats.pullTPS = (1.0 * (it->second.pullCount)) / SAMPLING_PERIOD;
it->second.pullCount = 0;
if (it->second.consumeRTCount != 0) {
consumeStats.consumeRT = (1.0 * (it->second.consumeRT)) / (it->second.consumeRTCount);
LOG_DEBUG("samplingInSeconds Key[%s], consumeRT:%.2f,Total RT:%lld, Count: %lld", it->first.c_str(),
consumeStats.consumeRT, it->second.consumeRT, it->second.consumeRTCount);
}
it->second.consumeRT = 0;
it->second.consumeRTCount = 0;
consumeStats.consumeOKTPS = (1.0 * (it->second.consumeOKCount)) / SAMPLING_PERIOD;
LOG_DEBUG("samplingInSeconds Key[%s], consumeOKTPS:%.2f, Count: %lld", it->first.c_str(), consumeStats.consumeOKTPS,
it->second.consumeOKCount);
it->second.consumeOKCount = 0;
consumeStats.consumeFailedTPS = (1.0 * (it->second.consumeFailedCount)) / SAMPLING_PERIOD;
it->second.consumeFailedCount = 0;
LOG_DEBUG("samplingInSeconds Key[%s], consumeFailedTPS:%.2f, Count: %lld", it->first.c_str(),
consumeStats.consumeFailedTPS, it->second.consumeFailedCount);
consumeStats.consumeFailedMsgs = it->second.consumeFailedMsgs;
// it->second.consumeFailedMsgs = 0;
updateConsumeStats(it->first, consumeStats);
}
}
void StatsServer::updateConsumeStats(std::string topic, std::string groupName, ConsumeStats consumeStats) {
if (m_status == RUNNING) {
std::string key = topic + "@" + groupName;
updateConsumeStats(key, consumeStats);
}
}
void StatsServer::updateConsumeStats(std::string key, ConsumeStats consumeStats) {
LOG_DEBUG("updateConsumeStats Key:%s, Count: %lld", key.c_str(), consumeStats.consumeOKTPS);
std::lock_guard<std::mutex> lock(m_consumeStatusMutex);
m_consumeStatus[key] = consumeStats;
}
ConsumeStats StatsServer::getConsumeStats(std::string key) {
ConsumeStats consumeStats;
std::lock_guard<std::mutex> lock(m_consumeStatusMutex);
if (m_consumeStatus.find(key) != m_consumeStatus.end()) {
return m_consumeStatus[key];
}
return consumeStats;
}
} // namespace rocketmq