blob: ba4773ae679355cced2d8bc4579aed0ec92b291d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.stat;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.common.stats.StatsSnapshot;
public class ConsumerStatsManager {
private static final InternalLogger log = ClientLogger.getLog();
private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT";
private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS";
private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT";
private final StatsItemSet topicAndGroupConsumeOKTPS;
private final StatsItemSet topicAndGroupConsumeRT;
private final StatsItemSet topicAndGroupConsumeFailedTPS;
private final StatsItemSet topicAndGroupPullTPS;
private final StatsItemSet topicAndGroupPullRT;
public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) {
this.topicAndGroupConsumeOKTPS =
new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log);
this.topicAndGroupConsumeRT =
new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log);
this.topicAndGroupConsumeFailedTPS =
new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log);
this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log);
this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log);
}
public void start() {
}
public void shutdown() {
}
public void incPullRT(final String group, final String topic, final long rt) {
this.topicAndGroupPullRT.addRTValue(topic + "@" + group, (int) rt, 1);
}
public void incPullTPS(final String group, final String topic, final long msgs) {
this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1);
}
public void incConsumeRT(final String group, final String topic, final long rt) {
this.topicAndGroupConsumeRT.addRTValue(topic + "@" + group, (int) rt, 1);
}
public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1);
}
public void incConsumeFailedTPS(final String group, final String topic, final long msgs) {
this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1);
}
public ConsumeStatus consumeStatus(final String group, final String topic) {
ConsumeStatus cs = new ConsumeStatus();
{
StatsSnapshot ss = this.getPullRT(group, topic);
if (ss != null) {
cs.setPullRT(ss.getAvgpt());
}
}
{
StatsSnapshot ss = this.getPullTPS(group, topic);
if (ss != null) {
cs.setPullTPS(ss.getTps());
}
}
{
StatsSnapshot ss = this.getConsumeRT(group, topic);
if (ss != null) {
cs.setConsumeRT(ss.getAvgpt());
}
}
{
StatsSnapshot ss = this.getConsumeOKTPS(group, topic);
if (ss != null) {
cs.setConsumeOKTPS(ss.getTps());
}
}
{
StatsSnapshot ss = this.getConsumeFailedTPS(group, topic);
if (ss != null) {
cs.setConsumeFailedTPS(ss.getTps());
}
}
{
StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group);
if (ss != null) {
cs.setConsumeFailedMsgs(ss.getSum());
}
}
return cs;
}
private StatsSnapshot getPullRT(final String group, final String topic) {
return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group);
}
private StatsSnapshot getPullTPS(final String group, final String topic) {
return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group);
}
private StatsSnapshot getConsumeRT(final String group, final String topic) {
StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group);
if (0 == statsData.getSum()) {
statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group);
}
return statsData;
}
private StatsSnapshot getConsumeOKTPS(final String group, final String topic) {
return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group);
}
private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) {
return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group);
}
}