blob: 31d468394ff170702af8645acddca334acd25458 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;
import static com.google.common.base.Preconditions.checkArgument;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.base.MoreObjects;
import com.google.common.base.Splitter;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import io.jsonwebtoken.SignatureAlgorithm;
import io.prometheus.client.Collector;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.crypto.SecretKey;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.zookeeper.CreateMode;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class PrometheusMetricsTest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.baseSetup();
AuthenticationProviderToken.resetMetrics();
}
@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testPublishRateLimitedTimes() throws Exception {
checkPublishRateLimitedTimes(true);
checkPublishRateLimitedTimes(false);
}
private void checkPublishRateLimitedTimes(boolean preciseRateLimit) throws Exception {
cleanup();
if (preciseRateLimit) {
conf.setBrokerPublisherThrottlingTickTimeMillis(10000000);
conf.setMaxPublishRatePerTopicInMessages(1);
conf.setMaxPublishRatePerTopicInBytes(1);
conf.setBrokerPublisherThrottlingMaxMessageRate(100000);
conf.setBrokerPublisherThrottlingMaxByteRate(10000000);
} else {
conf.setBrokerPublisherThrottlingTickTimeMillis(1);
conf.setBrokerPublisherThrottlingMaxMessageRate(1);
conf.setBrokerPublisherThrottlingMaxByteRate(1);
}
conf.setStatsUpdateFrequencyInSecs(100000000);
conf.setPreciseTopicPublishRateLimiterEnable(preciseRateLimit);
setup();
String ns1 = "prop/ns-abc1" + UUID.randomUUID();
admin.namespaces().createNamespace(ns1, 1);
String topicName = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
String topicName2 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
String topicName3 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
// Use another connection
@Cleanup
PulsarClient client2 = newPulsarClient(lookupUrl.toString(), 0);
Producer<byte[]> producer = pulsarClient.newProducer().producerName("my-pub").enableBatching(false)
.topic(topicName).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().producerName("my-pub-2").enableBatching(false)
.topic(topicName2).create();
Producer<byte[]> producer3 = client2.newProducer().producerName("my-pub-2").enableBatching(false)
.topic(topicName3).create();
producer.sendAsync(new byte[11]);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName, false).get().get();
Field field = AbstractTopic.class.getDeclaredField("publishRateLimitedTimes");
field.setAccessible(true);
Awaitility.await().untilAsserted(() -> {
long value = (long) field.get(persistentTopic);
assertEquals(value, 1);
});
@Cleanup
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
assertTrue(metrics.containsKey("pulsar_publish_rate_limit_times"));
metrics.get("pulsar_publish_rate_limit_times").forEach(item -> {
if (ns1.equals(item.tags.get("namespace"))) {
if (item.tags.get("topic").equals(topicName)) {
assertEquals(item.value, 1);
return;
} else if (item.tags.get("topic").equals(topicName2)) {
assertEquals(item.value, 1);
return;
} else if (item.tags.get("topic").equals(topicName3)) {
//When using precise rate limiting, we only trigger the rate limiting of the topic,
// so if the topic is not using the same connection, the rate limiting times will be 0
//When using asynchronous rate limiting, we will trigger the broker-level rate limiting,
// and all connections will be limited at this time.
if (preciseRateLimit) {
assertEquals(item.value, 0);
} else {
assertEquals(item.value, 1);
}
return;
}
fail("should not fail");
}
});
// Stats updater will reset the stats
pulsar.getBrokerService().updateRates();
Awaitility.await().untilAsserted(() -> {
long value = (long) field.get(persistentTopic);
assertEquals(value, 0);
});
@Cleanup
ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2);
String metricsStr2 = statsOut2.toString();
Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
assertTrue(metrics2.containsKey("pulsar_publish_rate_limit_times"));
metrics2.get("pulsar_publish_rate_limit_times").forEach(item -> {
if (ns1.equals(item.tags.get("namespace"))) {
assertEquals(item.value, 0);
}
});
producer.close();
producer2.close();
producer3.close();
}
@Test
public void testMetricsTopicCount() throws Exception {
String ns1 = "prop/ns-abc1";
String ns2 = "prop/ns-abc2";
admin.namespaces().createNamespace(ns1);
admin.namespaces().createNamespace(ns2);
String baseTopic1 = "persistent://" + ns1 + "/testMetricsTopicCount";
String baseTopic2 = "persistent://" + ns2 + "/testMetricsTopicCount";
for (int i = 0; i < 6; i++) {
admin.topics().createNonPartitionedTopic(baseTopic1 + UUID.randomUUID());
}
for (int i = 0; i < 3; i++) {
admin.topics().createNonPartitionedTopic(baseTopic2 + UUID.randomUUID());
}
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@Cleanup
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
Collection<Metric> metric = metrics.get("pulsar_topics_count");
metric.forEach(item -> {
if (ns1.equals(item.tags.get("namespace"))) {
assertEquals(item.value, 6.0);
}
if (ns2.equals(item.tags.get("namespace"))) {
assertEquals(item.value, 3.0);
}
});
Collection<Metric> topicLoadTimesMetrics = metrics.get("topic_load_times");
Collection<Metric> topicLoadTimesCountMetrics = metrics.get("topic_load_times_count");
assertEquals(topicLoadTimesMetrics.size(), 6);
assertEquals(topicLoadTimesCountMetrics.size(), 1);
Collection<Metric> pulsarTopicLoadTimesMetrics = metrics.get("pulsar_topic_load_times");
Collection<Metric> pulsarTopicLoadTimesCountMetrics = metrics.get("pulsar_topic_load_times_count");
assertEquals(pulsarTopicLoadTimesMetrics.size(), 6);
assertEquals(pulsarTopicLoadTimesCountMetrics.size(), 1);
}
@Test
public void testMetricsAvgMsgSize2() throws Exception {
String ns1 = "prop/ns-abc1";
admin.namespaces().createNamespace(ns1, 1);
String baseTopic1 = "persistent://" + ns1 + "/testMetricsTopicCount";
String topicName = baseTopic1 + UUID.randomUUID();
Producer producer = pulsarClient.newProducer().producerName("my-pub")
.topic(topicName).create();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName, false).get().get();
org.apache.pulsar.broker.service.Producer producerInServer = persistentTopic.getProducers().get("my-pub");
producerInServer.getStats().msgRateIn = 10;
producerInServer.getStats().msgThroughputIn = 100;
@Cleanup
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, true, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
assertTrue(metrics.containsKey("pulsar_average_msg_size"));
assertEquals(metrics.get("pulsar_average_msg_size").size(), 1);
Collection<Metric> avgMsgSizes = metrics.get("pulsar_average_msg_size");
avgMsgSizes.forEach(item -> {
if (ns1.equals(item.tags.get("namespace"))) {
assertEquals(item.value, 10);
}
});
producer.close();
}
@Test
public void testPerTopicStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();
Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("test")
.subscribe();
final int messages = 10;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
c2.acknowledge(c2.receive());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
metrics.entries().forEach(e -> {
System.out.println(e.getKey() + ": " + e.getValue());
});
// There should be 2 metrics with different tags for each topic
List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_write_latency_le_1");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_producers_count");
assertEquals(cm.size(), 2);
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("topic_load_times_count");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
cm = (List<Metric>) metrics.get("topic_load_failed_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("subscription"), "test");
cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("subscription"), "test");
p1.close();
p2.close();
c1.close();
c2.close();
}
@Test
public void testPerBrokerStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();
Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("test")
.subscribe();
final int messages = 10;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
c2.acknowledge(c2.receive());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
Collection<Metric> brokerMetrics = metrics.get("pulsar_broker_topics_count");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_subscriptions_count");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_producers_count");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_consumers_count");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_rate_in");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_rate_out");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_throughput_in");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_throughput_out");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_storage_size");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_storage_logical_size");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_storage_write_rate");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_storage_read_rate");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
brokerMetrics = metrics.get("pulsar_broker_msg_backlog");
assertEquals(brokerMetrics.size(), 1);
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");
p1.close();
p2.close();
c1.close();
c2.close();
}
/**
* Test that the total message and byte counts for a topic are not reset when a consumer disconnects.
*
* @throws Exception
*/
@Test
public void testPerTopicStatsReconnect() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();
final int messages = 5;
final int pulsarMessageOverhead = 31; // Number of extra bytes pulsar adds to each message
final int messageSizeBytes = "my-message-n".getBytes().length + pulsarMessageOverhead;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}
for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
}
c1.close();
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}
Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();
for (int i = 0; i < messages; i++) {
c2.acknowledge(c2.receive());
}
p1.close();
c2.close();
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
metrics.entries().forEach(e -> {
System.out.println(e.getKey() + ": " + e.getValue());
});
List<Metric> cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");
cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");
}
@DataProvider(name = "cacheEnable")
public static Object[][] cacheEnable() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
@Test(dataProvider = "cacheEnable")
public void testStorageReadCacheMissesRate(boolean cacheEnable) throws Exception {
cleanup();
conf.setManagedLedgerStatsPeriodSeconds(Integer.MAX_VALUE);
conf.setManagedLedgerCacheEvictionTimeThresholdMillis(Long.MAX_VALUE);
conf.setCacheEvictionByMarkDeletedPosition(true);
if (cacheEnable) {
conf.setManagedLedgerCacheSizeMB(1);
} else {
conf.setManagedLedgerCacheSizeMB(0);
}
setup();
String ns = "prop/ns-abc1";
admin.namespaces().createNamespace(ns);
String topic = "persistent://" + ns + "/testStorageReadCacheMissesRate" + UUID.randomUUID();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().enableBatching(false).topic(topic).create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();
byte[] msg = new byte[2 * 1024 * 1024];
new Random().nextBytes(msg);
producer.send(msg);
consumer.receive();
// when cacheEnable, the second msg will read cache miss
producer.send(msg);
consumer.receive();
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) persistentTopic.getManagedLedger());
managedLedger.getMbean().refreshStats(1, TimeUnit.SECONDS);
// includeTopicMetric true
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
metrics.entries().forEach(e -> System.out.println(e.getKey() + ": " + e.getValue()));
List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_read_cache_misses_rate");
assertEquals(cm.size(), 1);
if (cacheEnable) {
assertEquals(cm.get(0).value, 1.0);
} else {
assertEquals(cm.get(0).value, 2.0);
}
assertEquals(cm.get(0).tags.get("topic"), topic);
assertEquals(cm.get(0).tags.get("namespace"), ns);
assertEquals(cm.get(0).tags.get("cluster"), "test");
List<Metric> brokerMetric = (List<Metric>) metrics.get("pulsar_broker_storage_read_cache_misses_rate");
assertEquals(brokerMetric.size(), 1);
if (cacheEnable) {
assertEquals(brokerMetric.get(0).value, 1.0);
} else {
assertEquals(brokerMetric.get(0).value, 2.0);
}
assertEquals(brokerMetric.get(0).tags.get("cluster"), "test");
assertNull(brokerMetric.get(0).tags.get("namespace"));
assertNull(brokerMetric.get(0).tags.get("topic"));
// includeTopicMetric false
ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut2);
String metricsStr2 = statsOut2.toString();
Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
metrics2.entries().forEach(e -> System.out.println(e.getKey() + ": " + e.getValue()));
List<Metric> cm2 = (List<Metric>) metrics2.get("pulsar_storage_read_cache_misses_rate");
assertEquals(cm2.size(), 1);
if (cacheEnable) {
assertEquals(cm2.get(0).value, 1.0);
} else {
assertEquals(cm2.get(0).value, 2.0);
}
assertNull(cm2.get(0).tags.get("topic"));
assertEquals(cm2.get(0).tags.get("namespace"), ns);
assertEquals(cm2.get(0).tags.get("cluster"), "test");
List<Metric> brokerMetric2 = (List<Metric>) metrics.get("pulsar_broker_storage_read_cache_misses_rate");
assertEquals(brokerMetric2.size(), 1);
if (cacheEnable) {
assertEquals(brokerMetric2.get(0).value, 1.0);
} else {
assertEquals(brokerMetric2.get(0).value, 2.0);
}
assertEquals(brokerMetric2.get(0).tags.get("cluster"), "test");
assertNull(brokerMetric2.get(0).tags.get("namespace"));
assertNull(brokerMetric2.get(0).tags.get("topic"));
// test ManagedLedgerMetrics
List<Metric> mlMetric = ((List<Metric>) metrics.get("pulsar_ml_ReadEntriesOpsCacheMissesRate"));
assertEquals(mlMetric.size(), 1);
if (cacheEnable) {
assertEquals(mlMetric.get(0).value, 1.0);
} else {
assertEquals(mlMetric.get(0).value, 2.0);
}
assertEquals(mlMetric.get(0).tags.get("cluster"), "test");
assertEquals(mlMetric.get(0).tags.get("namespace"), ns + "/persistent");
}
@Test
public void testPerTopicExpiredStat() throws Exception {
String ns = "prop/ns-abc1";
admin.namespaces().createNamespace(ns);
String topic1 = "persistent://" + ns + "/testPerTopicExpiredStat1";
String topic2 = "persistent://" + ns + "/testPerTopicExpiredStat2";
List<String> topicList = Arrays.asList(topic2, topic1);
Producer<byte[]> p1 = pulsarClient.newProducer().topic(topic1).create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic(topic2).create();
final String subName = "test";
for (String topic : topicList) {
pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe().close();
}
final int messages = 10;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
p1.close();
p2.close();
// Let the message expire
for (String topic : topicList) {
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1);
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(-1);
}
pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry);
//wait for checkMessageExpiry
PersistentSubscription sub = (PersistentSubscription)
pulsar.getBrokerService().getTopicIfExists(topic1).get().get().getSubscription(subName);
PersistentSubscription sub2 = (PersistentSubscription)
pulsar.getBrokerService().getTopicIfExists(topic2).get().get().getSubscription(subName);
Awaitility.await().until(() -> sub.getExpiredMessageRate() != 0.0);
Awaitility.await().until(() -> sub2.getExpiredMessageRate() != 0.0);
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
// There should be 2 metrics with different tags for each topic
List<Metric> cm = (List<Metric>) metrics.get("pulsar_subscription_last_expire_timestamp");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), topic2);
assertEquals(cm.get(0).tags.get("namespace"), ns);
assertEquals(cm.get(1).tags.get("topic"), topic1);
assertEquals(cm.get(1).tags.get("namespace"), ns);
//check value
Field field = PersistentSubscription.class.getDeclaredField("lastExpireTimestamp");
field.setAccessible(true);
for (int i = 0; i < topicList.size(); i++) {
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
assertEquals((long) field.get(subscription), (long) cm.get(i).value);
}
cm = (List<Metric>) metrics.get("pulsar_subscription_msg_rate_expired");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), topic2);
assertEquals(cm.get(0).tags.get("namespace"), ns);
assertEquals(cm.get(1).tags.get("topic"), topic1);
assertEquals(cm.get(1).tags.get("namespace"), ns);
//check value
field = PersistentSubscription.class.getDeclaredField("expiryMonitor");
field.setAccessible(true);
NumberFormat nf = NumberFormat.getNumberInstance();
nf.setMaximumFractionDigits(3);
nf.setRoundingMode(RoundingMode.DOWN);
for (int i = 0; i < topicList.size(); i++) {
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
PersistentMessageExpiryMonitor monitor = (PersistentMessageExpiryMonitor) field.get(subscription);
assertEquals(Double.valueOf(nf.format(monitor.getMessageExpiryRate())).doubleValue(), cm.get(i).value);
}
cm = (List<Metric>) metrics.get("pulsar_subscription_total_msg_expired");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), topic2);
assertEquals(cm.get(0).tags.get("namespace"), ns);
assertEquals(cm.get(1).tags.get("topic"), topic1);
assertEquals(cm.get(1).tags.get("namespace"), ns);
//check value
for (int i = 0; i < topicList.size(); i++) {
assertEquals(messages, (long) cm.get(i).value);
}
}
@Test
public void testBundlesMetrics() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();
final int messages = 10;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}
for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
}
// Mock another broker to make split task work.
String mockedBroker = "/loadbalance/brokers/127.0.0.1:0";
mockZooKeeper.create(mockedBroker, new byte[]{0}, Collections.emptyList(), CreateMode.EPHEMERAL);
pulsar.getBrokerService().updateRates();
Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)pulsar.getLoadManager().get();
loadManager.getLoadManager().updateLocalBrokerData();
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_in"));
assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_out"));
assertTrue(metrics.containsKey("pulsar_bundle_topics_count"));
assertTrue(metrics.containsKey("pulsar_bundle_consumer_count"));
assertTrue(metrics.containsKey("pulsar_bundle_producer_count"));
assertTrue(metrics.containsKey("pulsar_bundle_msg_throughput_in"));
assertTrue(metrics.containsKey("pulsar_bundle_msg_throughput_out"));
assertTrue(metrics.containsKey("pulsar_lb_cpu_usage"));
assertTrue(metrics.containsKey("pulsar_lb_memory_usage"));
assertTrue(metrics.containsKey("pulsar_lb_directMemory_usage"));
assertTrue(metrics.containsKey("pulsar_lb_bandwidth_in_usage"));
assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));
assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));
// cleanup.
mockZooKeeper.delete(mockedBroker, 0);
}
@Test
public void testNonPersistentSubMetrics() throws Exception {
Producer<byte[]> p1 =
pulsarClient.newProducer().topic("non-persistent://my-property/use/my-ns/my-topic1").create();
Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("non-persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();
final int messages = 100;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}
for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
assertTrue(metrics.containsKey("pulsar_subscription_back_log"));
assertTrue(metrics.containsKey("pulsar_subscription_back_log_no_delayed"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_throughput_out"));
assertTrue(metrics.containsKey("pulsar_throughput_out"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_redeliver"));
assertTrue(metrics.containsKey("pulsar_subscription_unacked_messages"));
assertTrue(metrics.containsKey("pulsar_subscription_blocked_on_unacked_messages"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_out"));
assertTrue(metrics.containsKey("pulsar_out_bytes_total"));
assertTrue(metrics.containsKey("pulsar_out_messages_total"));
assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp"));
assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate"));
assertTrue(metrics.containsKey("pulsar_subscription_consumers_count"));
}
@Test
public void testPerNamespaceStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();
Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("test")
.subscribe();
final int messages = 10;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
c2.acknowledge(c2.receive());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
metrics.entries().forEach(e -> {
System.out.println(e.getKey() + ": " + e.getValue());
});
// There should be 1 metric aggregated per namespace
List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_write_latency_le_1");
assertEquals(cm.size(), 1);
assertNull(cm.get(0).tags.get("topic"));
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_producers_count");
assertEquals(cm.size(), 1);
assertNull(cm.get(0).tags.get("topic"));
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
p1.close();
p2.close();
c1.close();
c2.close();
}
@Test
public void testPerProducerStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
.producerName("producer1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2")
.producerName("producer2").create();
Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("Test")
.subscribe();
Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("Test")
.subscribe();
final int messages = 10;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
c2.acknowledge(c2.receive());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, true, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
metrics.entries().forEach(e -> {
System.out.println(e.getKey() + ": " + e.getValue());
});
List<Metric> cm = (List<Metric>) metrics.get("pulsar_producer_msg_rate_in");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("producer_name"), "producer2");
assertEquals(cm.get(0).tags.get("producer_id"), "1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("producer_name"), "producer1");
assertEquals(cm.get(1).tags.get("producer_id"), "0");
cm = (List<Metric>) metrics.get("pulsar_producer_msg_throughput_in");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("producer_name"), "producer2");
assertEquals(cm.get(0).tags.get("producer_id"), "1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("producer_name"), "producer1");
assertEquals(cm.get(1).tags.get("producer_id"), "0");
p1.close();
p2.close();
c1.close();
c2.close();
}
@Test
public void testPerConsumerStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();
Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("test")
.subscribe();
final int messages = 10;
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
c2.acknowledge(c2.receive());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, true, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
metrics.entries().forEach(e -> {
System.out.println(e.getKey() + ": " + e.getValue());
});
// There should be 1 metric aggregated per namespace
List<Metric> cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
assertEquals(cm.size(), 4);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("subscription"), "test");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(1).tags.get("subscription"), "test");
assertEquals(cm.get(1).tags.get("consumer_id"), "1");
assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(2).tags.get("subscription"), "test");
assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(3).tags.get("subscription"), "test");
assertEquals(cm.get(3).tags.get("consumer_id"), "0");
cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
assertEquals(cm.size(), 4);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("subscription"), "test");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(1).tags.get("subscription"), "test");
assertEquals(cm.get(1).tags.get("consumer_id"), "1");
assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(2).tags.get("subscription"), "test");
assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(3).tags.get("subscription"), "test");
assertEquals(cm.get(3).tags.get("consumer_id"), "0");
p1.close();
p2.close();
c1.close();
c2.close();
}
/** Checks for duplicate type definitions for a metric in the Prometheus metrics output. If the Prometheus parser
finds a TYPE definition for the same metric more than once, it errors out:
https://github.com/prometheus/prometheus/blob/f04b1b5559a80a4fd1745cf891ce392a056460c9/vendor/github.com/prometheus/common/expfmt/text_parse.go#L499-L502
This can happen when including topic metrics, since the same metric is reported multiple times with different labels. For example:
# TYPE pulsar_subscriptions_count gauge
pulsar_subscriptions_count{cluster="standalone"} 0
pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/metadata"} 1.0
pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/coordinate"} 1.0
pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/assignments"} 1.0
**/
// Running the test twice to make sure types are present when generated multiple times
@Test(invocationCount = 2)
public void testDuplicateMetricTypeDefinitions() throws Exception {
cleanup();
conf.setTransactionCoordinatorEnabled(true);
conf.setTransactionLogBatchedWriteEnabled(true);
conf.setTransactionPendingAckBatchedWriteEnabled(true);
setup();
Set<String> allPrometheusSuffixString = allPrometheusSuffixEnums();
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Map<String, String> typeDefs = new HashMap<>();
Map<String, String> metricNames = new HashMap<>();
Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
Splitter.on("\n").split(metricsStr).forEach(line -> {
if (line.isEmpty()) {
return;
}
if (line.startsWith("#")) {
// Check for duplicate type definitions
Matcher typeMatcher = typePattern.matcher(line);
checkArgument(typeMatcher.matches());
String metricName = typeMatcher.group(1);
String type = typeMatcher.group(2);
// From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
// "Only one TYPE line may exist for a given metric name."
if (!typeDefs.containsKey(metricName)) {
typeDefs.put(metricName, type);
} else {
fail("Duplicate type definition found for TYPE definition " + metricName);
System.out.println(metricsStr);
}
// From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
// "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
if (metricNames.containsKey(metricName)) {
System.out.println(metricsStr);
fail("TYPE definition for " + metricName + " appears after first sample");
}
} else {
Matcher metricMatcher = metricNamePattern.matcher(line);
checkArgument(metricMatcher.matches());
String metricName = metricMatcher.group(1);
metricNames.put(metricName, metricName);
}
});
// Metrics with no type definition
for (String metricName : metricNames.keySet()) {
if (!typeDefs.containsKey(metricName)) {
// This may be OK if this is a _sum or _count metric from a summary
boolean isNorm = false;
for (String suffix : allPrometheusSuffixString){
if (metricName.endsWith(suffix)){
String summaryMetricName = metricName.substring(0, metricName.indexOf(suffix));
if (!typeDefs.containsKey(summaryMetricName)) {
fail("Metric " + metricName + " does not have a corresponding summary type definition");
}
isNorm = true;
break;
}
}
if (!isNorm){
fail("Metric " + metricName + " does not have a type definition");
}
}
}
p1.close();
p2.close();
conf.setTransactionCoordinatorEnabled(false);
conf.setTransactionLogBatchedWriteEnabled(false);
conf.setTransactionPendingAckBatchedWriteEnabled(false);
}
/***
* this method will return ["_sum", "_info", "_bucket", "_count", "_total", "_created", "_gsum", "_gcount"]
*/
public static Set<String> allPrometheusSuffixEnums(){
HashSet<String> result = new HashSet<>();
final String metricsName = "123";
for (Collector.Type type : Collector.Type.values()){
Collector.MetricFamilySamples metricFamilySamples =
new Collector.MetricFamilySamples(metricsName, type, "", new ArrayList<>());
result.addAll(Arrays.asList(metricFamilySamples.getNames()));
}
return result.stream()
.map(str -> str.substring(metricsName.length()))
.filter(str -> StringUtils.isNotBlank(str))
.collect(Collectors.toSet());
}
@Test
public void testManagedLedgerCacheStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
metrics.entries().forEach(e ->
System.out.println(e.getKey() + ": " + e.getValue())
);
List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_cache_evictions");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
cm = (List<Metric>) metrics.get("pulsar_ml_cache_hits_rate");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
p1.close();
p2.close();
}
@Test
public void testManagedLedgerStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
Producer<byte[]> p3 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic1").create();
Producer<byte[]> p4 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic2").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
p3.send(message.getBytes());
p4.send(message.getBytes());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
metrics.entries().forEach(e ->
System.out.println(e.getKey() + ": " + e.getValue())
);
Map<String, String> typeDefs = new HashMap<>();
Map<String, String> metricNames = new HashMap<>();
Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
Splitter.on("\n").split(metricsStr).forEach(line -> {
if (line.isEmpty()) {
return;
}
if (line.startsWith("#")) {
// Check for duplicate type definitions
Matcher typeMatcher = typePattern.matcher(line);
checkArgument(typeMatcher.matches());
String metricName = typeMatcher.group(1);
String type = typeMatcher.group(2);
// From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
// "Only one TYPE line may exist for a given metric name."
if (!typeDefs.containsKey(metricName)) {
typeDefs.put(metricName, type);
} else {
fail("Duplicate type definition found for TYPE definition " + metricName);
}
// From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
// "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
if (metricNames.containsKey(metricName)) {
fail("TYPE definition for " + metricName + " appears after first sample");
}
} else {
Matcher metricMatcher = metricNamePattern.matcher(line);
checkArgument(metricMatcher.matches());
String metricName = metricMatcher.group(1);
metricNames.put(metricName, metricName);
}
});
List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryBytesRate");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("cluster"), "test");
String ns = cm.get(0).tags.get("namespace");
assertTrue(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"));
cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryMessagesRate");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("cluster"), "test");
ns = cm.get(0).tags.get("namespace");
assertTrue(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"));
p1.close();
p2.close();
p3.close();
p4.close();
}
@Test
public void testManagedLedgerBookieClientStats() throws Exception {
@Cleanup
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
@Cleanup
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
metrics.entries().forEach(e ->
System.out.println(e.getKey() + ": " + e.getValue())
);
List<Metric> cm = (List<Metric>) metrics.get(
keyNameBySubstrings(metrics,
"pulsar_managedLedger_client", "bookkeeper_ml_scheduler_threads"));
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
cm = (List<Metric>) metrics.get(
keyNameBySubstrings(metrics,
"pulsar_managedLedger_client", "bookkeeper_ml_scheduler_task_execution_sum"));
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("cluster"), "test");
cm = (List<Metric>) metrics.get(
keyNameBySubstrings(metrics,
"pulsar_managedLedger_client", "bookkeeper_ml_scheduler_max_queue_size"));
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
}
private static String keyNameBySubstrings(Multimap<String, Metric> metrics, String... substrings) {
for (String key: metrics.keys()) {
boolean found = true;
for (String s: substrings) {
if (!key.contains(s)) {
found = false;
break;
}
}
if (found) {
return key;
}
}
return null;
}
@Test
public void testAuthMetrics() throws IOException, AuthenticationException {
SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
AuthenticationProviderToken provider = new AuthenticationProviderToken();
Properties properties = new Properties();
properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
provider.initialize(conf);
try {
provider.authenticate(new AuthenticationDataSource() {
});
fail("Should have failed");
} catch (AuthenticationException e) {
// expected, no credential passed
}
String token = AuthTokenUtils.createToken(secretKey, "subject", Optional.empty());
// Pulsar protocol auth
String subject = provider.authenticate(new AuthenticationDataSource() {
@Override
public boolean hasDataFromCommand() {
return true;
}
@Override
public String getCommandData() {
return token;
}
});
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
List<Metric> cm = (List<Metric>) metrics.get("pulsar_authentication_success_total");
boolean haveSucceed = false;
for (Metric metric : cm) {
if (Objects.equals(metric.tags.get("auth_method"), "token")
&& Objects.equals(metric.tags.get("provider_name"), provider.getClass().getSimpleName())) {
haveSucceed = true;
}
}
Assert.assertTrue(haveSucceed);
cm = (List<Metric>) metrics.get("pulsar_authentication_failures_total");
boolean haveFailed = false;
for (Metric metric : cm) {
if (Objects.equals(metric.tags.get("auth_method"), "token")
&& Objects.equals(metric.tags.get("reason"),
AuthenticationProviderToken.ErrorCode.INVALID_AUTH_DATA.name())
&& Objects.equals(metric.tags.get("provider_name"), provider.getClass().getSimpleName())) {
haveFailed = true;
}
}
Assert.assertTrue(haveFailed);
}
@Test
public void testExpiredTokenMetrics() throws Exception {
SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
AuthenticationProviderToken provider = new AuthenticationProviderToken();
Properties properties = new Properties();
properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
provider.initialize(conf);
Date expiredDate = new Date(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1));
String expiredToken = AuthTokenUtils.createToken(secretKey, "subject", Optional.of(expiredDate));
try {
provider.authenticate(new AuthenticationDataSource() {
@Override
public boolean hasDataFromCommand() {
return true;
}
@Override
public String getCommandData() {
return expiredToken;
}
});
fail("Should have failed");
} catch (AuthenticationException e) {
// expected, token was expired
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
List<Metric> cm = (List<Metric>) metrics.get("pulsar_expired_token_total");
assertEquals(cm.size(), 1);
provider.close();
}
@Test
public void testExpiringTokenMetrics() throws Exception {
SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
AuthenticationProviderToken provider = new AuthenticationProviderToken();
Properties properties = new Properties();
properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
provider.initialize(conf);
int[] tokenRemainTime = new int[]{3, 7, 40, 100, 400};
for (int remainTime : tokenRemainTime) {
Date expiredDate = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(remainTime));
String expiringToken = AuthTokenUtils.createToken(secretKey, "subject", Optional.of(expiredDate));
provider.authenticate(new AuthenticationDataSource() {
@Override
public boolean hasDataFromCommand() {
return true;
}
@Override
public String getCommandData() {
return expiringToken;
}
});
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
Metric countMetric = ((List<Metric>) metrics.get("pulsar_expiring_token_minutes_count")).get(0);
assertEquals(countMetric.value, tokenRemainTime.length);
List<Metric> cm = (List<Metric>) metrics.get("pulsar_expiring_token_minutes_bucket");
assertEquals(cm.size(), 5);
cm.forEach((e) -> {
switch (e.tags.get("le")) {
case "5.0":
assertEquals(e.value, 1);
break;
case "10.0":
assertEquals(e.value, 2);
break;
case "60.0":
assertEquals(e.value, 3);
break;
case "240.0":
assertEquals(e.value, 4);
break;
default:
assertEquals(e.value, 5);
break;
}
});
provider.close();
}
@Test
public void testParsingWithPositiveInfinityValue() {
Multimap<String, Metric> metrics = parseMetrics("pulsar_broker_publish_latency{cluster=\"test\",quantile=\"0.0\"} +Inf");
List<Metric> cm = (List<Metric>) metrics.get("pulsar_broker_publish_latency");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("quantile"), "0.0");
assertEquals(cm.get(0).value, Double.POSITIVE_INFINITY);
}
@Test
public void testParsingWithNegativeInfinityValue() {
Multimap<String, Metric> metrics = parseMetrics("pulsar_broker_publish_latency{cluster=\"test\",quantile=\"0.0\"} -Inf");
List<Metric> cm = (List<Metric>) metrics.get("pulsar_broker_publish_latency");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("quantile"), "0.0");
assertEquals(cm.get(0).value, Double.NEGATIVE_INFINITY);
}
@Test
public void testManagedCursorPersistStats() throws Exception {
final String subName = "my-sub";
final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
final int messageSize = 10;
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
for (int i = 0; i < messageSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
consumer.acknowledge(consumer.receive().getMessageId());
}
// enable ExposeManagedCursorMetricsInPrometheus
pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(true);
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_cursor_persistLedgerSucceed");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("cursor_name"), subName);
// disable ExposeManagedCursorMetricsInPrometheus
pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(false);
ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2);
String metricsStr2 = statsOut2.toString();
Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
List<Metric> cm2 = (List<Metric>) metrics2.get("pulsar_ml_cursor_persistLedgerSucceed");
assertEquals(cm2.size(), 0);
producer.close();
consumer.close();
}
@Test
public void testBrokerConnection() throws Exception {
final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
List<Metric> cm = (List<Metric>) metrics.get("pulsar_connection_created_total_count");
compareBrokerConnectionStateCount(cm, 1.0);
cm = (List<Metric>) metrics.get("pulsar_connection_create_success_count");
compareBrokerConnectionStateCount(cm, 1.0);
cm = (List<Metric>) metrics.get("pulsar_connection_closed_total_count");
compareBrokerConnectionStateCount(cm, 0.0);
cm = (List<Metric>) metrics.get("pulsar_active_connections");
compareBrokerConnectionStateCount(cm, 1.0);
pulsarClient.close();
statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
metricsStr = statsOut.toString();
metrics = parseMetrics(metricsStr);
cm = (List<Metric>) metrics.get("pulsar_connection_closed_total_count");
compareBrokerConnectionStateCount(cm, 1.0);
pulsar.getConfiguration().setAuthenticationEnabled(true);
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString())
.operationTimeout(1, TimeUnit.MILLISECONDS));
try {
pulsarClient.newProducer()
.topic(topicName)
.create();
fail();
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.AuthenticationException);
}
pulsarClient.close();
statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
metricsStr = statsOut.toString();
metrics = parseMetrics(metricsStr);
cm = (List<Metric>) metrics.get("pulsar_connection_closed_total_count");
compareBrokerConnectionStateCount(cm, 2.0);
cm = (List<Metric>) metrics.get("pulsar_connection_create_fail_count");
compareBrokerConnectionStateCount(cm, 1.0);
cm = (List<Metric>) metrics.get("pulsar_connection_create_success_count");
compareBrokerConnectionStateCount(cm, 1.0);
cm = (List<Metric>) metrics.get("pulsar_active_connections");
compareBrokerConnectionStateCount(cm, 0.0);
cm = (List<Metric>) metrics.get("pulsar_connection_created_total_count");
compareBrokerConnectionStateCount(cm, 2.0);
}
private void compareBrokerConnectionStateCount(List<Metric> cm, double count) {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("broker"), "localhost");
assertEquals(cm.get(0).value, count);
}
@Test
void testParseMetrics() throws IOException {
String sampleMetrics = IOUtils.toString(getClass().getClassLoader()
.getResourceAsStream("prometheus_metrics_sample.txt"), StandardCharsets.UTF_8);
parseMetrics(sampleMetrics);
}
@Test
public void testCompaction() throws Exception {
final String topicName = "persistent://my-namespace/use/my-ns/my-compaction1";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
List<Metric> cm = (List<Metric>) metrics.get("pulsar_compaction_removed_event_count");
assertEquals(cm.size(), 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_succeed_count");
assertEquals(cm.size(), 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_failed_count");
assertEquals(cm.size(), 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_duration_time_in_mills");
assertEquals(cm.size(), 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_read_throughput");
assertEquals(cm.size(), 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_write_throughput");
assertEquals(cm.size(), 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_count");
assertEquals(cm.size(), 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
assertEquals(cm.size(), 0);
//
final int numMessages = 1000;
final int maxKeys = 10;
Random r = new Random(0);
for (int j = 0; j < numMessages; j++) {
int keyIndex = r.nextInt(maxKeys);
String key = "key"+keyIndex;
byte[] data = ("my-message-" + key + "-" + j).getBytes();
producer.newMessage()
.key(key)
.value(data)
.send();
}
Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
compactor.compact(topicName).get();
statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
metricsStr = statsOut.toString();
metrics = parseMetrics(metricsStr);
cm = (List<Metric>) metrics.get("pulsar_compaction_removed_event_count");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, 990);
cm = (List<Metric>) metrics.get("pulsar_compaction_succeed_count");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, 1);
cm = (List<Metric>) metrics.get("pulsar_compaction_failed_count");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_duration_time_in_mills");
assertEquals(cm.size(), 1);
assertTrue(cm.get(0).value > 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_read_throughput");
assertEquals(cm.size(), 1);
assertTrue(cm.get(0).value > 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_write_throughput");
assertEquals(cm.size(), 1);
assertTrue(cm.get(0).value > 0);
cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_count");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, 10);
cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, 840);
pulsarClient.close();
}
@Test
public void testMetricsWithCache() throws Throwable {
ServiceConfiguration configuration = Mockito.mock(ServiceConfiguration.class);
Mockito.when(configuration.getManagedLedgerStatsPeriodSeconds()).thenReturn(2);
Mockito.when(configuration.isMetricsBufferResponse()).thenReturn(true);
Mockito.when(configuration.getClusterName()).thenReturn(configClusterName);
Mockito.when(pulsar.getConfiguration()).thenReturn(configuration);
int period = pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds();
TimeWindow<Object> timeWindow = new TimeWindow<>(2, (int) TimeUnit.SECONDS.toMillis(period));
for (int a = 0; a < 4; a++) {
long start = System.currentTimeMillis();
ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, false, statsOut1, null);
ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, false, statsOut2, null);
long end = System.currentTimeMillis();
if (timeWindow.currentWindowStart(start) == timeWindow.currentWindowStart(end)) {
String metricsStr1 = statsOut1.toString();
String metricsStr2 = statsOut2.toString();
assertEquals(metricsStr1, metricsStr2);
Multimap<String, Metric> metrics = parseMetrics(metricsStr1);
}
Thread.sleep(TimeUnit.SECONDS.toMillis(period / 2));
}
}
@Test
public void testSplitTopicAndPartitionLabel() throws Exception {
String ns1 = "prop/ns-abc1";
String ns2 = "prop/ns-abc2";
admin.namespaces().createNamespace(ns1);
admin.namespaces().createNamespace(ns2);
String baseTopic1 = "persistent://" + ns1 + "/testMetricsTopicCount";
String baseTopic2 = "persistent://" + ns2 + "/testMetricsTopicCount";
for (int i = 0; i < 6; i++) {
admin.topics().createNonPartitionedTopic(baseTopic1 + UUID.randomUUID());
}
for (int i = 0; i < 3; i++) {
admin.topics().createPartitionedTopic(baseTopic2 + UUID.randomUUID(), 3);
}
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topicsPattern("persistent://" + ns1 + "/.*")
.subscriptionName("sub")
.subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topicsPattern("persistent://" + ns2 + "/.*")
.subscriptionName("sub")
.subscribe();
@Cleanup
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, true, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
Collection<Metric> metric = metrics.get("pulsar_consumers_count");
assertTrue(metric.size() >= 15);
metric.forEach(item -> {
if (ns1.equals(item.tags.get("namespace"))) {
assertEquals(item.tags.get("partition"), "-1");
}
if (ns2.equals(item.tags.get("namespace"))) {
System.out.println(item);
assertTrue(Integer.parseInt(item.tags.get("partition")) >= 0);
}
});
consumer1.close();
consumer2.close();
}
private void compareCompactionStateCount(List<Metric> cm, double count) {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("broker"), "localhost");
assertEquals(cm.get(0).value, count);
}
@Test
public void testMetricsGroupedByTypeDefinitions() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
AtomicReference<String> currentMetric = new AtomicReference<>();
Splitter.on("\n").split(metricsStr).forEach(line -> {
if (line.isEmpty()) {
return;
}
if (line.startsWith("#")) {
// Get the current type definition
Matcher typeMatcher = typePattern.matcher(line);
checkArgument(typeMatcher.matches());
String metricName = typeMatcher.group(1);
currentMetric.set(metricName);
} else {
Matcher metricMatcher = metricNamePattern.matcher(line);
checkArgument(metricMatcher.matches());
String metricName = metricMatcher.group(1);
if (metricName.endsWith("_bucket")) {
metricName = metricName.substring(0, metricName.indexOf("_bucket"));
} else if (metricName.endsWith("_count") && !currentMetric.get().endsWith("_count")) {
metricName = metricName.substring(0, metricName.indexOf("_count"));
} else if (metricName.endsWith("_sum") && !currentMetric.get().endsWith("_sum")) {
metricName = metricName.substring(0, metricName.indexOf("_sum"));
} else if (metricName.endsWith("_total") && !currentMetric.get().endsWith("_total")) {
metricName = metricName.substring(0, metricName.indexOf("_total"));
} else if (metricName.endsWith("_created") && !currentMetric.get().endsWith("_created")) {
metricName = metricName.substring(0, metricName.indexOf("_created"));
}
if (!metricName.equals(currentMetric.get())) {
System.out.println(metricsStr);
fail("Metric not grouped under its type definition: " + line);
}
}
});
p1.close();
p2.close();
}
/**
* Hacky parsing of Prometheus text format. Should be good enough for unit tests
*/
public static Multimap<String, Metric> parseMetrics(String metrics) {
Multimap<String, Metric> parsed = ArrayListMultimap.create();
// Example of lines are
// jvm_threads_current{cluster="standalone",} 203.0
// or
// pulsar_subscriptions_count{cluster="standalone", namespace="public/default",
// topic="persistent://public/default/test-2"} 0.0
Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)$");
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
Splitter.on("\n").split(metrics).forEach(line -> {
if (line.isEmpty() || line.startsWith("#")) {
return;
}
Matcher matcher = pattern.matcher(line);
assertTrue(matcher.matches(), "line " + line + " does not match pattern " + pattern);
String name = matcher.group(1);
Metric m = new Metric();
String numericValue = matcher.group(3);
if (numericValue.equalsIgnoreCase("-Inf")) {
m.value = Double.NEGATIVE_INFINITY;
} else if (numericValue.equalsIgnoreCase("+Inf")) {
m.value = Double.POSITIVE_INFINITY;
} else {
m.value = Double.parseDouble(numericValue);
}
String tags = matcher.group(2);
Matcher tagsMatcher = tagsPattern.matcher(tags);
while (tagsMatcher.find()) {
String tag = tagsMatcher.group(1);
String value = tagsMatcher.group(2);
m.tags.put(tag, value);
}
parsed.put(name, m);
});
return parsed;
}
public static class Metric {
public Map<String, String> tags = new TreeMap<>();
public double value;
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("tags", tags).add("value", value).toString();
}
}
@Test
public void testEscapeLabelValue() throws Exception {
String ns1 = "prop/ns-abc1";
admin.namespaces().createNamespace(ns1);
String topic = "persistent://" + ns1 + "/\"mytopic";
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
final Consumer<?> consumer = pulsarClient.newConsumer()
.subscriptionName("sub")
.topic(topic)
.subscribe();
@Cleanup
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false,
false, statsOut);
String metricsStr = statsOut.toString();
final List<String> subCountLines = metricsStr.lines()
.filter(line -> line.startsWith("pulsar_subscriptions_count"))
.collect(Collectors.toList());
System.out.println(subCountLines);
assertEquals(subCountLines.size(), 1);
assertEquals(subCountLines.get(0),
"pulsar_subscriptions_count{cluster=\"test\",namespace=\"prop/ns-abc1\",topic=\"persistent://prop/ns-abc1/\\\"mytopic\"} 1");
}
}