blob: 592c5d929a4dc5c8a3bc5f223f2c653d03d50423 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.manager.service.impl;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.text.DecimalFormat;
import io.streamnative.pulsar.manager.service.BrokerStatsService;
import io.streamnative.pulsar.manager.service.BrokersService;
import io.streamnative.pulsar.manager.service.ClustersService;
import io.streamnative.pulsar.manager.service.EnvironmentCacheService;
import io.streamnative.pulsar.manager.utils.HttpUtil;
import io.streamnative.pulsar.manager.entity.ConsumerStatsEntity;
import io.streamnative.pulsar.manager.entity.ConsumersStatsRepository;
import io.streamnative.pulsar.manager.entity.EnvironmentEntity;
import io.streamnative.pulsar.manager.entity.EnvironmentsRepository;
import io.streamnative.pulsar.manager.entity.PublisherStatsEntity;
import io.streamnative.pulsar.manager.entity.PublishersStatsRepository;
import io.streamnative.pulsar.manager.entity.PulsarManagerTopicStats;
import io.streamnative.pulsar.manager.entity.ReplicationStatsEntity;
import io.streamnative.pulsar.manager.entity.ReplicationsStatsRepository;
import io.streamnative.pulsar.manager.entity.SubscriptionStatsEntity;
import io.streamnative.pulsar.manager.entity.SubscriptionsStatsRepository;
import io.streamnative.pulsar.manager.entity.TopicStatsEntity;
import io.streamnative.pulsar.manager.entity.TopicsStatsRepository;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Service
@Configuration
@Component
@EnableScheduling
public class BrokerStatsServiceImpl implements BrokerStatsService {
private static final Logger log = LoggerFactory.getLogger(BrokerStatsServiceImpl.class);
@Value("${backend.directRequestHost}")
private String directRequestHost;
@Value("${clear.stats.interval}")
private Long clearStatsInterval;
@Autowired
private EnvironmentsRepository environmentsRepository;
@Autowired
private ClustersService clustersService;
@Autowired
private BrokersService brokersService;
@Autowired
private TopicsStatsRepository topicsStatsRepository;
@Autowired
private SubscriptionsStatsRepository subscriptionsStatsRepository;
@Autowired
private PublishersStatsRepository publishersStatsRepository;
@Autowired
private ReplicationsStatsRepository replicationsStatsRepository;
@Autowired
private ConsumersStatsRepository consumersStatsRepository;
@Autowired
private EnvironmentCacheService environmentCache;
private static final Map<String, String> header = new HashMap<String, String>(){{
put("Content-Type","application/json");
}};
public String forwarBrokerStatsMetrics(String broker, String requestHost) {
broker = checkServiceUrl(broker, requestHost);
return HttpUtil.doGet(broker + "/admin/v2/broker-stats/metrics", header);
}
public String forwardBrokerStatsTopics(String broker, String requestHost) {
broker = checkServiceUrl(broker, requestHost);
return HttpUtil.doGet(broker + "/admin/v2/broker-stats/topics", header);
}
@Scheduled(initialDelayString = "${init.delay.interval}", fixedDelayString = "${insert.stats.interval}")
private void scheduleCollectStats() {
long unixTime = System.currentTimeMillis() / 1000L;
List<EnvironmentEntity> environmentEntities = environmentsRepository.getAllEnvironments();
Map<Pair<String, String>, String> collectStatsServiceUrls = new HashMap<>();
for (EnvironmentEntity env : environmentEntities) {
String serviceUrl = checkServiceUrl(null, env.getBroker());
Map<String, Object> clusterObject =
clustersService.getClustersList(0, 0, serviceUrl, (c) -> serviceUrl);
List<HashMap<String, Object>> clusterLists = (List<HashMap<String, Object>>) clusterObject.get("data");
clusterLists.forEach((clusterMap) -> {
String cluster = (String) clusterMap.get("cluster");
Pair<String, String> envCluster = Pair.of(env.getName(), cluster);
collectStatsServiceUrls.put(envCluster, (String) clusterMap.get("serviceUrl"));
});
}
collectStatsServiceUrls.forEach((envCluster, serviceUrl) -> {
log.info("Start collecting stats from env {} / cluster {} @ {}",
envCluster.getLeft(), envCluster.getRight(), serviceUrl);
collectStatsToDB(unixTime, envCluster.getLeft(), envCluster.getRight(), serviceUrl);
});
log.info("Start clearing stats from broker");
clearStats(unixTime, clearStatsInterval / 1000);
}
public void collectStatsToDB(long unixTime, String env, String cluster, String serviceUrl) {
Map<String, Object> brokerObject = brokersService.getBrokersList(0, 0, cluster, serviceUrl);
List<HashMap<String, Object>> brokerLists = (List<HashMap<String, Object>>) brokerObject.get("data");
brokerLists.forEach((brokerMap) -> {
String tempBroker = (String) brokerMap.get("broker");
// TODO: handle other protocols
String broker = "http://" + tempBroker;
String result = HttpUtil.doGet(broker + "/admin/v2/broker-stats/topics", header);
Gson gson = new Gson();
HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>> brokerStatsTopicEntity = gson.fromJson(result,
new TypeToken<HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>>>() {
}.getType());
brokerStatsTopicEntity.forEach((namespace, namespaceStats) -> {
namespaceStats.forEach((bundle, bundleStats) -> {
bundleStats.forEach((persistent, persistentStats) -> {
persistentStats.forEach((topic, topicStats) -> {
DecimalFormat df = new DecimalFormat("#.##");
TopicStatsEntity topicStatsEntity = new TopicStatsEntity();
String[] topicPath = this.parseTopic(topic);
topicStatsEntity.setEnvironment(env);
topicStatsEntity.setCluster(cluster);
topicStatsEntity.setBroker(tempBroker);
topicStatsEntity.setTenant(topicPath[0]);
topicStatsEntity.setNamespace(topicPath[1]);
topicStatsEntity.setBundle(bundle);
topicStatsEntity.setPersistent(persistent);
topicStatsEntity.setTopic(topicPath[2]);
topicStatsEntity.setMsgRateIn(Double.parseDouble(df.format(topicStats.getMsgRateIn())));
topicStatsEntity.setMsgRateOut(Double.parseDouble(df.format(topicStats.getMsgRateOut())));
topicStatsEntity.setMsgThroughputIn(Double.parseDouble(df.format(topicStats.getMsgThroughputIn())));
topicStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(topicStats.getMsgThroughputOut())));
topicStatsEntity.setAverageMsgSize(Double.parseDouble(df.format(topicStats.getAverageMsgSize())));
topicStatsEntity.setStorageSize(Double.parseDouble(df.format(topicStats.getStorageSize())));
topicStatsEntity.setSubscriptionCount(topicStats.getSubscriptions().size());
topicStatsEntity.setProducerCount(topicStats.getPublishers().size());
topicStatsEntity.setTimestamp(unixTime);
long topicStatsId = topicsStatsRepository.save(topicStatsEntity);
if (topicStats.getSubscriptions() != null) {
topicStats.getSubscriptions().forEach((subscription, subscriptionStats) -> {
SubscriptionStatsEntity subscriptionStatsEntity = new SubscriptionStatsEntity();
subscriptionStatsEntity.setTopicStatsId(topicStatsId);
subscriptionStatsEntity.setSubscription(subscription);
subscriptionStatsEntity.setMsgRateOut(Double.parseDouble(df.format(subscriptionStats.getMsgRateOut())));
subscriptionStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(subscriptionStats.getMsgThroughputOut())));
subscriptionStatsEntity.setMsgRateRedeliver(Double.parseDouble(df.format(subscriptionStats.getMsgRateRedeliver())));
subscriptionStatsEntity.setNumberOfEntriesSinceFirstNotAckedMessage(
subscriptionStats.getNumberOfEntriesSinceFirstNotAckedMessage());
subscriptionStatsEntity.setTotalNonContiguousDeletedMessagesRange(
subscriptionStats.getTotalNonContiguousDeletedMessagesRange());
subscriptionStatsEntity.setMsgBacklog(subscriptionStats.getMsgBacklog());
subscriptionStatsEntity.setSubscriptionType(String.valueOf(subscriptionStats.getType()));
subscriptionStatsEntity.setMsgRateExpired(Double.parseDouble(df.format(subscriptionStats.getMsgRateExpired())));
subscriptionStatsEntity.setReplicated(subscriptionStats.isReplicated());
subscriptionStatsEntity.setTimestamp(unixTime);
long subscriptionStatsId = subscriptionsStatsRepository.save(subscriptionStatsEntity);
if (subscriptionStats.getConsumers() != null) {
subscriptionStats.getConsumers().forEach((consumerStats) -> {
ConsumerStatsEntity consumerStatsEntity = new ConsumerStatsEntity();
consumerStatsEntity.setSubscriptionStatsId(subscriptionStatsId);
consumerStatsEntity.setTopicStatsId(topicStatsId);
consumerStatsEntity.setReplicationStatsId(-1);
consumerStatsEntity.setConsumer(consumerStats.getConsumerName());
consumerStatsEntity.setMsgRateOut(Double.parseDouble(df.format(consumerStats.getMsgRateOut())));
consumerStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(consumerStats.getMsgThroughputOut())));
consumerStatsEntity.setMsgRateRedeliver(Double.parseDouble(df.format(consumerStats.getMsgRateRedeliver())));
consumerStatsEntity.setAvailablePermits(consumerStats.getAvailablePermits());
consumerStatsEntity.setAddress(consumerStats.getAddress());
consumerStatsEntity.setConnectedSince(consumerStats.getConnectedSince());
consumerStatsEntity.setClientVersion(consumerStats.getClientVersion());
consumerStatsEntity.setMetadata(gson.toJson(consumerStats.getMetadata()));
consumerStatsEntity.setTimestamp(unixTime);
consumersStatsRepository.save(consumerStatsEntity);
});
}
});
}
if (topicStats.getPublishers() != null) {
topicStats.getPublishers().forEach((producer) -> {
PublisherStatsEntity publisherStatsEntity = new PublisherStatsEntity();
publisherStatsEntity.setTopicStatsId(topicStatsId);
publisherStatsEntity.setProducerId(producer.getProducerId());
publisherStatsEntity.setProducerName(producer.getProducerName());
publisherStatsEntity.setMsgRateIn(Double.parseDouble(df.format(producer.getMsgRateIn())));
publisherStatsEntity.setMsgThroughputIn(Double.parseDouble(df.format(producer.getMsgThroughputIn())));
publisherStatsEntity.setAverageMsgSize(Double.parseDouble(df.format(producer.getAverageMsgSize())));
publisherStatsEntity.setAddress(producer.getAddress());
publisherStatsEntity.setConnectedSince(producer.getConnectedSince());
publisherStatsEntity.setClientVersion(producer.getClientVersion());
publisherStatsEntity.setMetadata(gson.toJson(producer.getMetadata()));
publisherStatsEntity.setTimestamp(unixTime);
publishersStatsRepository.save(publisherStatsEntity);
});
}
if (topicStats.getReplication() != null) {
topicStats.getReplication().forEach((replication, replicatorStats) -> {
ReplicationStatsEntity replicationStatsEntity = new ReplicationStatsEntity();
replicationStatsEntity.setCluster(replication);
replicationStatsEntity.setTopicStatsId(topicStatsId);
replicationStatsEntity.setMsgRateIn(Double.parseDouble(df.format(replicatorStats.getMsgRateIn())));
replicationStatsEntity.setMsgThroughputIn(Double.parseDouble(df.format(replicatorStats.getMsgThroughputIn())));
replicationStatsEntity.setMsgRateOut(Double.parseDouble(df.format(replicatorStats.getMsgRateOut())));
replicationStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(replicatorStats.getMsgThroughputOut())));
replicationStatsEntity.setMsgRateExpired(Double.parseDouble(df.format(replicatorStats.getMsgRateExpired())));
replicationStatsEntity.setReplicationBacklog(replicatorStats.getReplicationBacklog());
replicationStatsEntity.setConnected(replicatorStats.isConnected());
replicationStatsEntity.setReplicationDelayInSeconds(replicatorStats.getReplicationDelayInSeconds());
replicationStatsEntity.setInboundConnection(replicatorStats.getInboundConnection());
replicationStatsEntity.setInboundConnectedSince(replicatorStats.getInboundConnectedSince());
replicationStatsEntity.setOutboundConnection(replicatorStats.getOutboundConnection());
replicationStatsEntity.setOutboundConnectedSince(replicatorStats.getOutboundConnectedSince());
replicationStatsEntity.setTimestamp(unixTime);
replicationsStatsRepository.save(replicationStatsEntity);
});
}
});
});
});
});
});
}
public void clearStats(long nowTime, long timeInterval) {
consumersStatsRepository.remove(nowTime, timeInterval);
subscriptionsStatsRepository.remove(nowTime, timeInterval);
publishersStatsRepository.remove(nowTime, timeInterval);
replicationsStatsRepository.remove(nowTime, timeInterval);
topicsStatsRepository.remove(nowTime, timeInterval);
}
public static String checkServiceUrl(String serviceUrl, String requestHost) {
if (serviceUrl == null || serviceUrl.length() <= 0) {
serviceUrl = requestHost;
}
if (!serviceUrl.startsWith("http")) {
serviceUrl = "http://" + serviceUrl;
}
return serviceUrl;
}
private String[] parseTopic(String topic) {
String tntPath = topic.split("://")[1];
String[] topicPath = tntPath.split("/");
return topicPath;
}
}