blob: 5d30b0016c821bbf7692fca3c0778caec9ee5307 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.plugin;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.UUID;
import java.util.stream.Collectors;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Meter;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.hamcrest.Matchers.containsInAnyOrder;
@RunWith(Parameterized.class)
public class MetricsPluginTest extends ActiveMQTestBase {
private boolean legacyConfig;
@Parameterized.Parameters(name = "legacyConfig={0}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][]{{true}, {false}});
}
public MetricsPluginTest(boolean legacyConfig) {
this.legacyConfig = legacyConfig;
}
protected ActiveMQServer server;
protected ClientSession session;
protected ClientSessionFactory sf;
protected ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(false, createDefaultInVMConfig());
if (legacyConfig) {
server.getConfiguration().setMetricsPlugin(new SimpleMetricsPlugin().init(null));
} else {
server.getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setPlugin(new SimpleMetricsPlugin().init(null)));
}
server.start();
locator = createInVMNonHALocator();
sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(false, true, true));
}
@Test
public void testForArtemisMetricsPresence() throws Exception {
class Metric {
public final String name;
public final String description;
public final Double value;
private Metric(String name, String description, Double value) {
this.name = name;
this.description = description;
this.value = value;
}
@Override
public String toString() {
return name + ": " + value + " (" + description + ")";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Metric metric = (Metric) o;
return Objects.equals(name, metric.name) &&
Objects.equals(description, metric.description) &&
Objects.equals(value, metric.value);
}
@Override
public int hashCode() {
return Objects.hash(name, description, value);
}
}
final String queueName = "simpleQueue";
final String addressName = "simpleAddress";
session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
Map<Meter.Id, Double> metrics = getMetrics();
List<Metric> artemisMetrics = metrics.entrySet().stream()
.map(entry -> new Metric(
entry.getKey().getName(),
entry.getKey().getDescription(),
entry.getValue()))
.filter(metric -> metric.name.startsWith("artemis"))
.collect(Collectors.toList());
// for (Metric metric : artemisMetrics) {
// IntegrationTestLogger.LOGGER.info(metric);
// }
assertThat(artemisMetrics, containsInAnyOrder(
// artemis.(un)routed.message.count is present twice, because of activemq.notifications address
new Metric("artemis.address.memory.usage", "Memory used by all the addresses on broker for in-memory messages", 0.0),
new Metric("artemis.address.memory.usage.percentage", "Memory used by all the addresses on broker as a percentage of the global-max-size", 0.0),
new Metric("artemis.connection.count", "Number of clients connected to this server", 1.0),
new Metric("artemis.consumer.count", "number of consumers consuming messages from this queue", 0.0),
new Metric("artemis.delivering.durable.message.count", "number of durable messages that this queue is currently delivering to its consumers", 0.0),
new Metric("artemis.delivering.durable.persistent.size", "persistent size of durable messages that this queue is currently delivering to its consumers", 0.0),
new Metric("artemis.delivering.message.count", "number of messages that this queue is currently delivering to its consumers", 0.0),
new Metric("artemis.delivering.persistent_size", "persistent size of messages that this queue is currently delivering to its consumers", 0.0),
new Metric("artemis.disk.store.usage", "Percentage of total disk store used", 0.0),
new Metric("artemis.durable.message.count", "number of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)", 0.0),
new Metric("artemis.durable.persistent.size", "persistent size of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)", 0.0),
new Metric("artemis.message.count", "number of messages currently in this queue (includes scheduled, paged, and in-delivery messages)", 0.0),
new Metric("artemis.messages.acknowledged", "number of messages acknowledged from this queue since it was created", 0.0),
new Metric("artemis.messages.added", "number of messages added to this queue since it was created", 0.0),
new Metric("artemis.messages.expired", "number of messages expired from this queue since it was created", 0.0),
new Metric("artemis.messages.killed", "number of messages removed from this queue since it was created due to exceeding the max delivery attempts", 0.0),
new Metric("artemis.persistent.size", "persistent size of all messages (including durable and non-durable) currently in this queue (includes scheduled, paged, and in-delivery messages)", 0.0),
new Metric("artemis.routed.message.count", "number of messages routed to one or more bindings", 0.0),
new Metric("artemis.routed.message.count", "number of messages routed to one or more bindings", 0.0),
new Metric("artemis.scheduled.durable.message.count", "number of durable scheduled messages in this queue", 0.0),
new Metric("artemis.scheduled.durable.persistent.size", "persistent size of durable scheduled messages in this queue", 0.0),
new Metric("artemis.scheduled.message.count", "number of scheduled messages in this queue", 0.0),
new Metric("artemis.scheduled.persistent.size", "persistent size of scheduled messages in this queue", 0.0),
new Metric("artemis.total.connection.count", "Number of clients which have connected to this server since it was started", 1.0),
new Metric("artemis.unrouted.message.count", "number of messages not routed to any bindings", 0.0),
new Metric("artemis.unrouted.message.count", "number of messages not routed to any bindings", 2.0),
new Metric("artemis.address.size", "the number of estimated bytes being used by all the queue(s) bound to this address; used to control paging and blocking", 0.0),
new Metric("artemis.address.size", "the number of estimated bytes being used by all the queue(s) bound to this address; used to control paging and blocking", 0.0)
));
}
@Test
public void testForBasicMetricsPresenceAndValue() throws Exception {
internalTestForBasicMetrics(true);
}
@Test
public void testDisablingMetrics() throws Exception {
internalTestForBasicMetrics(false);
}
private void internalTestForBasicMetrics(boolean enabled) throws Exception {
final String data = "Simple Text " + UUID.randomUUID().toString();
final String queueName = "simpleQueue";
final String addressName = "simpleAddress";
server.getAddressSettingsRepository().getMatch(addressName).setEnableMetrics(enabled);
session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session.createProducer(addressName);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString(data);
producer.send(message);
producer.close();
Queue queue = server.locateQueue(queueName);
Wait.assertEquals(1, queue::getMessageCount);
Map<Meter.Id, Double> metrics = getMetrics();
checkMetric(metrics, "artemis.message.count", "queue", queueName, 1.0, enabled);
checkMetric(metrics, "artemis.messages.added", "queue", queueName, 1.0, enabled);
checkMetric(metrics, "artemis.messages.acknowledged", "queue", queueName, 0.0, enabled);
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 1.0, enabled);
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 0.0, enabled);
checkMetric(metrics, "artemis.routed.message.count", "address", addressName, 1.0, enabled);
checkMetric(metrics, "artemis.unrouted.message.count", "address", addressName, 0.0, enabled);
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0, enabled);
ClientConsumer consumer = session.createConsumer(queueName);
session.start();
message = consumer.receive(1000);
assertNotNull(message);
metrics = getMetrics();
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 1.0, enabled);
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 1.0, enabled);
message.acknowledge();
assertEquals(data, message.getBodyBuffer().readString());
session.commit(); // force the ack to be committed
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)).getMessagesAcknowledged() == 1, 1000, 100));
consumer.close();
metrics = getMetrics();
checkMetric(metrics, "artemis.message.count", "queue", queueName, 0.0, enabled);
checkMetric(metrics, "artemis.messages.added", "queue", queueName, 1.0, enabled);
checkMetric(metrics, "artemis.messages.acknowledged", "queue", queueName, 1.0, enabled);
checkMetric(metrics, "artemis.durable.message.count", "queue", queueName, 0.0, enabled);
checkMetric(metrics, "artemis.delivering.message.count", "queue", queueName, 0.0, enabled);
checkMetric(metrics, "artemis.routed.message.count", "address", addressName, 1.0, enabled);
checkMetric(metrics, "artemis.unrouted.message.count", "address", addressName, 0.0, enabled);
checkMetric(metrics, "artemis.consumer.count", "queue", queueName, 0.0, enabled);
}
@Test
public void testMessageCountWithPaging() throws Exception {
final String data = "Simple Text " + UUID.randomUUID().toString();
final String queueName = "simpleQueue";
final String addressName = "simpleAddress";
server.getAddressSettingsRepository().getMatch(addressName).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxSizeBytes(1024 * 10).setPageSizeBytes(1024 * 5);
session.createQueue(new QueueConfiguration(queueName).setAddress(addressName).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session.createProducer(addressName);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString(data);
long messageCount = 0;
while (!server.getPagingManager().getPageStore(new SimpleString(addressName)).isPaging()) {
producer.send(message);
messageCount++;
}
Wait.assertEquals(messageCount, server.locateQueue(queueName)::getMessageCount, 2000, 100);
checkMetric(getMetrics(), "artemis.message.count", "queue", queueName, Double.valueOf(messageCount));
for (int i = 0; i < messageCount; i++) {
producer.send(message);
}
producer.close();
Wait.assertEquals(messageCount * 2, server.locateQueue(queueName)::getMessageCount, 2000, 100);
checkMetric(getMetrics(), "artemis.message.count", "queue", queueName, Double.valueOf(messageCount * 2));
}
public Map<Meter.Id, Double> getMetrics() {
return getMetrics(server);
}
public static Map<Meter.Id, Double> getMetrics(ActiveMQServer server) {
Map<Meter.Id, Double> metrics = new HashMap<>();
List<Meter> meters = server.getMetricsManager().getMeterRegistry().getMeters();
assertTrue(meters.size() > 0);
for (Meter meter : meters) {
Iterable<Measurement> measurements = meter.measure();
for (Measurement measurement : measurements) {
metrics.put(meter.getId(), measurement.getValue());
}
}
return metrics;
}
public void checkMetric(Map<Meter.Id, Double> metrics, String metric, String tag, String tagValue, Double expectedValue) {
checkMetric(metrics, metric, tag, tagValue, expectedValue, true);
}
public void checkMetric(Map<Meter.Id, Double> metrics, String metric, String tag, String tagValue, Double expectedValue, boolean enabled) {
OptionalDouble actualValue = metrics.entrySet().stream()
.filter(entry -> metric.equals(entry.getKey().getName()))
.filter(entry -> tagValue.equals(entry.getKey().getTag(tag)))
.mapToDouble(Map.Entry::getValue)
.findFirst();
if (enabled) {
assertTrue(metric + " for " + tag + " " + tagValue + " not present", actualValue.isPresent());
assertEquals(metric + " not equal", expectedValue, actualValue.getAsDouble(), 0);
} else {
assertFalse(metric + " for " + tag + " " + tagValue + " present", actualValue.isPresent());
}
}
}