| package org.apache.rocketmq.exporter.task; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.rocketmq.client.exception.MQClientException; |
| import org.apache.rocketmq.common.protocol.body.Connection; |
| import org.apache.rocketmq.common.protocol.body.ConsumerConnection; |
| import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; |
| import org.apache.rocketmq.exporter.service.RMQMetricsService; |
| import org.apache.rocketmq.remoting.exception.RemotingException; |
| import org.apache.rocketmq.tools.admin.MQAdminExt; |
| import org.slf4j.Logger; |
| |
| public class ClientMetricTaskRunnable implements Runnable { |
| private String consumerGroup; |
| private ConsumerConnection connection; |
| private boolean enableCollectJStack; |
| private MQAdminExt mqAdmin; |
| private Logger logger; |
| private RMQMetricsService metricsService; |
| |
| public ClientMetricTaskRunnable(String consumerGroup, ConsumerConnection connection, |
| boolean enableCollectJStack, MQAdminExt mqAdmin, Logger logger, |
| RMQMetricsService metricsService) { |
| this.consumerGroup = consumerGroup; |
| this.connection = connection; |
| this.enableCollectJStack = enableCollectJStack; |
| this.mqAdmin = mqAdmin; |
| this.logger = logger; |
| this.metricsService = metricsService; |
| } |
| |
| @Override |
| |
| public void run() { |
| if (this.connection == null || this.connection.getConnectionSet() == null || |
| this.connection.getConnectionSet().isEmpty()) { |
| return; |
| } |
| logger.debug(String.format("ClientMetricTask-group=%s,enable jstack=%s", |
| consumerGroup, |
| this.enableCollectJStack |
| |
| )); |
| long start = System.currentTimeMillis(); |
| ConsumerRunningInfo runningInfo = null; |
| for (Connection conn : this.connection.getConnectionSet()) { |
| try { |
| runningInfo = mqAdmin.getConsumerRunningInfo(this.consumerGroup, conn.getClientId(), this.enableCollectJStack); |
| } catch (InterruptedException | RemotingException e) { |
| logger.warn(String.format("ClientMetricTask-exception.ignore. group=%s,client id=%s, client addr=%s, language=%s,version=%d", |
| consumerGroup, |
| conn.getClientId(), |
| conn.getClientAddr(), |
| conn.getLanguage(), |
| conn.getVersion() |
| ), |
| e); |
| runningInfo = null; |
| } catch (MQClientException e) { |
| logger.warn(String.format("ClientMetricTask-exception.ignore. group=%s,client id=%s, client addr=%s, language=%s,version=%d, error code=%d, error msg=%s", |
| consumerGroup, |
| conn.getClientId(), |
| conn.getClientAddr(), |
| conn.getLanguage(), |
| conn.getVersion(), |
| e.getResponseCode(), |
| e.getErrorMessage()) |
| ); |
| runningInfo = null; |
| } |
| if (runningInfo == null) { |
| continue; |
| } |
| if (!StringUtils.isBlank(runningInfo.getJstack())) { |
| logger.error(String.format("group=%s, jstack=%s", consumerGroup, runningInfo.getJstack())); |
| } |
| if (runningInfo.getStatusTable() != null && !runningInfo.getStatusTable().isEmpty()) { |
| for (String topic : runningInfo.getStatusTable().keySet()) { |
| metricsService.getCollector().addConsumerClientFailedMsgCountsMetric( |
| this.consumerGroup, |
| topic, |
| conn.getClientAddr(), |
| conn.getClientId(), |
| runningInfo.getStatusTable().get(topic).getConsumeFailedMsgs()); |
| metricsService.getCollector().addConsumerClientFailedTPSMetric( |
| this.consumerGroup, |
| topic, |
| conn.getClientAddr(), |
| conn.getClientId(), |
| runningInfo.getStatusTable().get(topic).getConsumeFailedTPS()); |
| metricsService.getCollector().addConsumerClientOKTPSMetric( |
| this.consumerGroup, |
| topic, |
| conn.getClientAddr(), |
| conn.getClientId(), |
| runningInfo.getStatusTable().get(topic).getConsumeOKTPS()); |
| metricsService.getCollector().addConsumeRTMetricMetric( |
| this.consumerGroup, |
| topic, |
| conn.getClientAddr(), |
| conn.getClientId(), |
| runningInfo.getStatusTable().get(topic).getConsumeRT()); |
| metricsService.getCollector().addPullRTMetric( |
| this.consumerGroup, |
| topic, |
| conn.getClientAddr(), |
| conn.getClientId(), |
| runningInfo.getStatusTable().get(topic).getPullRT()); |
| metricsService.getCollector().addPullTPSMetric( |
| this.consumerGroup, |
| topic, |
| conn.getClientAddr(), |
| conn.getClientId(), |
| runningInfo.getStatusTable().get(topic).getPullTPS()); |
| |
| |
| } |
| |
| } |
| } |
| long cost = System.currentTimeMillis() - start; |
| logger.debug(String.format("one-ClientMetricTask-group=%s, cost=%d, online-instance count=%d", this.consumerGroup, cost, this.connection.getConnectionSet().size())); |
| } |
| } |