| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.resourcegroup; |
| |
| import com.google.common.collect.Sets; |
| import io.prometheus.client.Summary; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; |
| import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; |
| import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType; |
| import org.apache.pulsar.broker.service.BrokerService; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.ProducerConsumerBase; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.SubscriptionType; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.TenantInfo; |
| import org.apache.pulsar.common.policies.data.TenantInfoImpl; |
| import org.apache.pulsar.common.policies.data.TopicStats; |
| import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| |
| // The tests implement a set of producer/consumer operations on a set of topics. |
| // [A thread is started for each producer, and each consumer in the test.] |
| // The tenants and namespaces in those topics are associated with a set of resource-groups (RGs). |
| // After sending/receiving all the messages, traffic usage statistics, and Prometheus-metrics |
| // are verified on the RGs. |
| @Slf4j |
| public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { |
| @BeforeClass |
| @Override |
| protected void setup() throws Exception { |
| super.internalSetup(); |
| this.prepareForOps(); |
| |
| ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator() { |
| @Override |
| public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes, |
| long currentMessagesUsed, long lastReportedMessages, |
| long lastReportTimeMSecsSinceEpoch) { |
| // Pretend to report every time, just to see the RG-metrics increasing. |
| numLocalUsageReports++; |
| return true; |
| } |
| |
| @Override |
| public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) { |
| return 0; |
| } |
| }; |
| |
| ResourceUsageTopicTransportManager transportMgr = new ResourceUsageTopicTransportManager(pulsar); |
| this.rgservice = new ResourceGroupService(pulsar, TimeUnit.SECONDS, transportMgr, dummyQuotaCalc); |
| |
| this.prepareRGs(); |
| Thread.sleep(2000); |
| } |
| |
| @AfterClass(alwaysRun = true) |
| @Override |
| protected void cleanup() throws Exception { |
| super.internalCleanup(); |
| } |
| |
| @Test |
| public void testMTProduceConsumeRGUsagePersistentTopicNamesSameTenant() throws Exception { |
| testProduceConsumeUsageOnRG(PersistentTopicNamesSameTenantAndNsRGs); |
| } |
| |
| @Test |
| public void testMTProduceConsumeRGUsagePersistentTopicNamesDifferentTenant() throws Exception { |
| testProduceConsumeUsageOnRG(PersistentTopicNamesDifferentTenantAndNsRGs); |
| } |
| |
| @Test |
| public void testMTProduceConsumeRGUsageNonPersistentTopicNamesSameTenant() throws Exception { |
| testProduceConsumeUsageOnRG(NonPersistentTopicNamesSameTenantAndNsRGs); |
| } |
| |
| @Test |
| public void testMTProduceConsumeRGUsageNonPersistentTopicNamesDifferentTenant() throws Exception { |
| testProduceConsumeUsageOnRG(NonPersistentTopicNamesDifferentTenantAndNsRGs); |
| } |
| |
| // A class which implements the producer; the main test thread will spawn multiple producers. |
| private class ProduceMessages implements Runnable { |
| private final int producerId; |
| private final int numMesgsToProduce; |
| private final String myProduceTopic; |
| |
| private int sentNumBytes = 0; |
| private int sentNumMsgs = 0; |
| private int numExceptions = 0; |
| |
| ProduceMessages(int prodId, int nMesgs, String[] topics) { |
| producerId = prodId; |
| numMesgsToProduce = nMesgs; |
| myProduceTopic = topics[producerId % NUM_TOPICS]; |
| } |
| |
| public int getNumBytesSent() { |
| return sentNumBytes; |
| } |
| |
| public int getNumMessagesSent() { |
| return sentNumMsgs; |
| } |
| |
| public int getNumExceptions() { |
| return numExceptions; |
| } |
| |
| @Override |
| public void run() { |
| Producer<byte[]> producer = null; |
| |
| try { |
| // The producer will send messages to a specific topic, since it doesn't make sense for a producer |
| // to produce a message with vagueness about the destination topic (neither do Pulsar APIs allow it). |
| producer = pulsarClient.newProducer() |
| .topic(myProduceTopic) |
| .create(); |
| } catch (PulsarClientException p) { |
| numExceptions++; |
| log.info("Producer={} got exception while building producer: ex={}", |
| producerId, p.getMessage()); |
| } |
| |
| for (int ix = 0; ix < numMesgsToProduce; ix++) { |
| byte[] mesg; |
| try { |
| mesg = String.format("ProducerId=%d, ix=%d, topic=%s", producerId, ix, myProduceTopic).getBytes(); |
| MessageId msgId = producer.send(mesg); |
| sentNumBytes += mesg.length; |
| sentNumMsgs++; |
| log.debug("Producer={}, sent msg-ix={}, msgId={}", producerId, ix, msgId); |
| } catch (PulsarClientException e) { |
| numExceptions++; |
| log.error("Producer={} got exception while sending {}-th time: ex={}", |
| producerId, ix, e.getMessage()); |
| } |
| } |
| try { |
| producer.flush(); |
| producer.close(); |
| } catch (PulsarClientException e) { |
| numExceptions++; |
| log.error("Producer={} got exception while closing producer: ex={}", |
| producerId, e.getMessage()); |
| } |
| |
| log.info("Producer={} done with topic={}; got {} exceptions", producerId, myProduceTopic, numExceptions); |
| } |
| } |
| |
| // Track the producer object, and the thread using it. |
| class ProducerWithThread { |
| ProduceMessages producer; |
| Thread thread; |
| } |
| |
| // A class which implements the consumer; the main test thread will spawn multiple consumers. |
| private class ConsumeMessages implements Runnable { |
| private final int consumerId; |
| private final int numMesgsForThisConsumer; |
| private final int numTotalMesgsToConsume; |
| private final SubscriptionType subscriptionType; |
| |
| private final String[] topicStrings; |
| private Consumer<byte[]> consumer = null; |
| |
| private final int recvTimeoutMilliSecs = 1000; |
| private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 second |
| private int recvdNumBytes = 0; |
| private int recvdNumMsgs = 0; |
| private int numExceptions = 0; |
| private volatile boolean allMessagesReceived = false; |
| private volatile boolean consumerIsReady = false; |
| |
| ConsumeMessages(int consId, int nMesgs, int totalMesgs, SubscriptionType subType, String[] topics) { |
| consumerId = consId; |
| numMesgsForThisConsumer = nMesgs; |
| numTotalMesgsToConsume = totalMesgs; |
| subscriptionType = subType; |
| topicStrings = topics; |
| } |
| |
| public boolean isConsumerReady() { |
| return consumerIsReady; |
| } |
| |
| public int getNumBytesRecvd() { |
| return recvdNumBytes; |
| } |
| |
| public int getNumMessagesRecvd() { |
| return recvdNumMsgs; |
| } |
| |
| public int getNumExceptions() { |
| return numExceptions; |
| } |
| |
| public void setAllMessagesReceived() { |
| allMessagesReceived = true; |
| } |
| |
| public void closeConsumer() { |
| try { |
| consumerIsReady = false; |
| consumer.close(); |
| } catch (PulsarClientException p) { |
| numExceptions++; |
| log.error("Consumer={} got exception while closing consumer: ex={}", |
| consumerId, p.getMessage()); |
| } |
| } |
| |
| @Override |
| public void run() { |
| // Create a consumer and subscription, and space for messages, so that they are held for consumption. |
| int recvQueueSize = 0; |
| String subscriptionString = null; |
| switch (subscriptionType) { |
| default: |
| numExceptions++; |
| final String errMesg = String.format("Consumer=%d got unexpected subscription type=%s", |
| consumerId, subscriptionType); |
| Assert.fail(errMesg); |
| break; |
| case Shared: |
| recvQueueSize = numTotalMesgsToConsume; |
| subscriptionString = "my-subscription"; |
| break; |
| case Exclusive: |
| recvQueueSize = numMesgsForThisConsumer; |
| subscriptionString = "my-subscription-" + consumerId; |
| break; |
| } |
| |
| try { |
| // The consumer will try to get a message from any of the topics, since Pulsar allows a consumer to |
| // be subscribed to multiple topics. |
| consumer = pulsarClient.newConsumer() |
| .topic(topicStrings) |
| .subscriptionName(subscriptionString) |
| .subscriptionType(subscriptionType) |
| .receiverQueueSize(recvQueueSize) |
| .ackTimeout(ackTimeoutMilliSecs, TimeUnit.MILLISECONDS) |
| .subscribe(); |
| } catch (PulsarClientException p) { |
| numExceptions++; |
| log.error("Consumer={} got exception while building consumer: ex={}", |
| consumerId, p.getMessage()); |
| } |
| |
| Message<byte[]> message; |
| consumerIsReady = true; |
| while (consumerIsReady && !allMessagesReceived) { |
| log.debug("Consumer={} waiting for mesgnum={}", consumerId, recvdNumMsgs); |
| try { |
| message = consumer.receive(recvTimeoutMilliSecs, TimeUnit.MILLISECONDS); |
| if (message != null) { |
| consumer.acknowledgeAsync(message); |
| String mesg = String.format("Consumer=%d recvd %d-th mesg; id=%s, data=%s", |
| consumerId, recvdNumMsgs, message.getMessageId(), new String(message.getData())); |
| log.debug(mesg); |
| recvdNumBytes += message.getValue().length; |
| recvdNumMsgs++; |
| } |
| } catch (PulsarClientException p) { |
| numExceptions++; |
| log.error("Consumer={} got exception in while receiving {}-th mesg at consumer: ex={}", |
| consumerId, recvdNumMsgs, p.getMessage()); |
| } |
| } |
| |
| log.debug("Consumer={} done; got {} exceptions", consumerId, numExceptions); |
| } |
| } |
| |
| // Track the consumer object, and the thread using it. |
| class ConsumerWithThread { |
| ConsumeMessages consumer; |
| Thread thread; |
| } |
| |
| // Given a topic, get the tenant RG-name |
| private String TopicToTenantRGName(TopicName topicName) { |
| // Under the current topic naming scheme, the tenant-rg name is just the tenant part of the topic. |
| String tenant = topicName.getTenant(); |
| return tenant; |
| } |
| |
| // Given a topic, get the namespace RG-name |
| private String TopicToNamespaceRGName(TopicName topicName) { |
| // Under the current topic naming scheme, the namespace-rg name is just the namespace part of the topic. |
| String nameSpace = topicName.getNamespacePortion(); |
| return nameSpace; |
| } |
| |
| // Return true if the tenant-RG == namespace-RG in the topics given, false otherwise. |
| // If some are equal, and others unequal, throw, because this is unexpected in this UT at the moment. |
| private boolean tenantRGEqualsNamespaceRG(String[] topicStrings) throws PulsarClientException { |
| int numEqualRGs = 0; |
| int numUnEqualRGs = 0; |
| int numTopics = topicStrings.length; |
| for (String topicStr : topicStrings) { |
| TopicName topic = TopicName.get(topicStr); |
| String tenantRG = TopicToTenantRGName(topic); |
| String namespaceRG = TopicToNamespaceRGName(topic); |
| if (tenantRG.compareTo(namespaceRG) == 0) { |
| numEqualRGs++; |
| } else { |
| numUnEqualRGs++; |
| } |
| } |
| if ((numEqualRGs + numUnEqualRGs != numTopics) || (numEqualRGs > 0 && numUnEqualRGs > 0)) { |
| String errMesg = String.format("Found %s topics with equal RGs and %s with unequal, on %s topics", |
| numEqualRGs, numUnEqualRGs, numTopics); |
| throw new PulsarClientException(errMesg); |
| } else { |
| return numEqualRGs == numTopics; |
| } |
| } |
| |
| private void registerTenantsAndNamespaces(String[] topicStrings) throws Exception { |
| for (String topicStr : topicStrings) { |
| final TopicName topic = TopicName.get(topicStr); |
| final String tenantRG = TopicToTenantRGName(topic); |
| final String namespaceRG = TopicToNamespaceRGName(topic); |
| final NamespaceName ns = topic.getNamespaceObject(); |
| |
| // The tenant name and namespace name parts of the topic are the same as their corresponding RG-names. |
| // Hence, the arguments to register look a little odd. |
| if (!registeredTenants.contains(tenantRG)) { |
| this.rgservice.registerTenant(tenantRG, tenantRG); |
| registeredTenants.add(tenantRG); |
| } |
| if (!registeredNamespaces.contains(namespaceRG)) { |
| this.rgservice.registerNameSpace(namespaceRG, ns); |
| registeredNamespaces.add(namespaceRG); |
| } |
| } |
| } |
| |
| private void unRegisterTenantsAndNamespaces(String[] topicStrings) throws Exception { |
| for (String topicStr : topicStrings) { |
| final TopicName topic = TopicName.get(topicStr); |
| final String tenantRG = TopicToTenantRGName(topic); |
| final String namespaceRG = TopicToNamespaceRGName(topic); |
| final String tenantAndNamespace = topic.getNamespace(); |
| |
| // The tenant name and namespace name parts of the topic are the same as their corresponding RG-names. |
| // Hence, the arguments to unRegister look a little odd. |
| if (registeredTenants.contains(tenantRG)) { |
| this.rgservice.unRegisterTenant(tenantRG, tenantRG); |
| registeredTenants.remove(tenantRG); |
| } |
| if (registeredNamespaces.contains(namespaceRG)) { |
| this.rgservice.unRegisterNameSpace(namespaceRG, NamespaceName.get(tenantAndNamespace)); |
| registeredNamespaces.remove(namespaceRG); |
| } |
| } |
| } |
| |
| // Produce/consume messages on the given topics, and verify that the resource-group stats are updated. |
| private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception { |
| createRGs(); |
| // creating the topics results in exposing a regression. |
| // It can be put back after https://github.com/apache/pulsar/issues/11289 is fixed. |
| // createTopics(topicStrings); |
| registerTenantsAndNamespaces(topicStrings); |
| |
| final int TotalExpectedMessagesToSend = NUM_TOTAL_MESSAGES; |
| final int TotalExpectedMessagesToReceive = TotalExpectedMessagesToSend; |
| |
| final SubscriptionType consumeSubscriptionType = SubscriptionType.Shared; // Shared, or Exclusive |
| |
| ProducerWithThread[] prodThr = new ProducerWithThread[NUM_PRODUCERS]; |
| ConsumerWithThread[] consThr = new ConsumerWithThread[NUM_CONSUMERS]; |
| int sentNumBytes = 0; |
| int sentNumMsgs = 0; |
| int numProducerExceptions = 0; |
| |
| // Fork some consumers to receive the messages. |
| for (int ix = 0; ix < NUM_CONSUMERS; ix++) { |
| consThr[ix] = new ConsumerWithThread(); |
| ConsumeMessages cm = new ConsumeMessages(ix, NUM_MESSAGES_PER_CONSUMER, TotalExpectedMessagesToReceive, |
| consumeSubscriptionType, topicStrings); |
| Thread thr = new Thread(cm); |
| thr.start(); |
| consThr[ix].consumer = cm; |
| consThr[ix].thread = thr; |
| } |
| |
| // Wait for all consumers to be ready, before forking producers, so we don't lose messages. |
| int numReadyConsumers; |
| do { |
| Thread.sleep(500); |
| numReadyConsumers = 0; |
| for (int ix = 0; ix < NUM_CONSUMERS; ix++) { |
| if (consThr[ix].consumer.isConsumerReady()) { |
| numReadyConsumers++; |
| } |
| } |
| log.debug("{} consumers are not yet ready", NUM_CONSUMERS - numReadyConsumers); |
| } while (numReadyConsumers < NUM_CONSUMERS); |
| |
| // Fork some producers to send the messages. |
| for (int ix = 0; ix < NUM_PRODUCERS; ix++) { |
| prodThr[ix] = new ProducerWithThread(); |
| ProduceMessages pm = new ProduceMessages(ix, NUM_MESSAGES_PER_PRODUCER, topicStrings); |
| Thread thr = new Thread(pm); |
| thr.start(); |
| prodThr[ix].producer = pm; |
| prodThr[ix].thread = thr; |
| } |
| |
| // Wait for the producers to complete. |
| int sentMsgs, sentBytes; |
| for (int ix = 0; ix < NUM_PRODUCERS; ix++) { |
| prodThr[ix].thread.join(); |
| sentBytes = prodThr[ix].producer.getNumBytesSent(); |
| sentMsgs = prodThr[ix].producer.getNumMessagesSent(); |
| numProducerExceptions += prodThr[ix].producer.getNumExceptions(); |
| |
| log.debug("Producer={} sent {} mesgs and {} bytes", ix, sentMsgs, sentBytes); |
| sentNumBytes += sentBytes; |
| sentNumMsgs += sentMsgs; |
| } |
| Assert.assertEquals(sentNumMsgs, TotalExpectedMessagesToSend); |
| Assert.assertEquals(numProducerExceptions, 0); |
| |
| int recvdNumMsgs; |
| int numConsumerExceptions = 0; |
| |
| // Wait for the consumers to receive all the messages. |
| do { |
| Thread.sleep(2000); |
| recvdNumMsgs = 0; |
| int consNumMesgsRecvd; |
| for (int ix = 0; ix < NUM_CONSUMERS; ix++) { |
| consNumMesgsRecvd = consThr[ix].consumer.getNumMessagesRecvd(); |
| recvdNumMsgs += consNumMesgsRecvd; |
| log.debug("consumer={} received {} messages (current total {}, expected {})", |
| ix, consNumMesgsRecvd, recvdNumMsgs, TotalExpectedMessagesToReceive); |
| } |
| } while (recvdNumMsgs < TotalExpectedMessagesToReceive); |
| |
| // Tell the consumers that all expected messages have been received (but don't close them yet). |
| for (int ix = 0; ix < NUM_CONSUMERS; ix++) { |
| consThr[ix].consumer.setAllMessagesReceived(); |
| log.debug("consumer={} told to stop", ix); |
| } |
| |
| boolean[] joinedConsumers = new boolean[NUM_CONSUMERS]; |
| |
| int recvdNumBytes = 0; |
| recvdNumMsgs = 0; |
| int numConsumersDone = 0; |
| int recvdMsgs, recvdBytes; |
| while (numConsumersDone < NUM_CONSUMERS) { |
| for (int ix = 0; ix < NUM_CONSUMERS; ix++) { |
| if (!joinedConsumers[ix]) { |
| recvdBytes = consThr[ix].consumer.getNumBytesRecvd(); |
| recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd(); |
| numConsumerExceptions += consThr[ix].consumer.getNumExceptions(); |
| log.debug("Consumer={} received {} mesgs and {} bytes", ix, recvdMsgs, recvdBytes); |
| |
| consThr[ix].thread.join(); |
| joinedConsumers[ix] = true; |
| log.debug("Joined consumer={}", ix); |
| |
| recvdNumBytes += recvdBytes; |
| recvdNumMsgs += recvdMsgs; |
| numConsumersDone++; |
| } |
| } |
| } |
| |
| // Close the consumers. |
| for (int ix = 0; ix < NUM_CONSUMERS; ix++) { |
| consThr[ix].consumer.closeConsumer(); |
| } |
| |
| Assert.assertEquals(recvdNumMsgs, TotalExpectedMessagesToReceive); |
| Assert.assertEquals(numConsumerExceptions, 0); |
| |
| boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings); |
| // If the tenant and NS are on different RGs, the bytes/messages get counted once on the |
| // tenant RG, and again on the namespace RG. This double-counting is avoided if tenant-RG == NS-RG. |
| // This is a known (and discussed) artifact in the implementation. |
| // 'ScaleFactor' is a way to incorporate that effect in the verification. |
| final int scaleFactor = tenantRGEqualsNsRG ? 1 : 2; |
| |
| // Verify producer and consumer side stats. |
| this.verifyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, scaleFactor, true, true); |
| |
| // Verify the metrics corresponding to the operations in this test. |
| this.verifyRGMetrics(sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, scaleFactor, true, true); |
| |
| unRegisterTenantsAndNamespaces(topicStrings); |
| // destroyTopics can be called after createTopics() is added back |
| // (see comment above regarding https://github.com/apache/pulsar/issues/11289). |
| // destroyTopics(topicStrings); |
| destroyRGs(); |
| } |
| |
| // Verify the app stats with what we see from the broker-service, and the resource-group (which in turn internally |
| // derives stats from the broker service) |
| private void verifyRGProdConsStats(String[] topicStrings, |
| int sentNumBytes, int sentNumMsgs, |
| int recvdNumBytes, int recvdNumMsgs, |
| int scaleFactor, boolean checkProduce, |
| boolean checkConsume) throws Exception { |
| |
| BrokerService bs = pulsar.getBrokerService(); |
| Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats(); |
| |
| log.debug("verifyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size()); |
| |
| // Pulsar runtime adds some additional bytes in the exchanges: a 45-byte per-message |
| // metadata of some kind, plus more as the number of messages increases. |
| // Hence the ">=" assertion with ExpectedNumBytesSent/Received in the following checks. |
| final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs; |
| final int ExpectedNumBytesReceived = recvdNumBytes + PER_MESSAGE_METADATA_OHEAD * recvdNumMsgs; |
| |
| long totalOutMessages = 0, totalOutBytes = 0; |
| long totalInMessages = 0, totalInBytes = 0; |
| BytesAndMessagesCount totalTenantRGProdCounts = new BytesAndMessagesCount(); |
| BytesAndMessagesCount totalTenantRGConsCounts = new BytesAndMessagesCount(); |
| BytesAndMessagesCount totalNsRGProdCounts = new BytesAndMessagesCount(); |
| BytesAndMessagesCount totalNsRGConsCounts = new BytesAndMessagesCount(); |
| BytesAndMessagesCount prodCounts, consCounts; |
| |
| // Since the following walk is on topics, keep track of the RGs for which we have already gathered stats, |
| // so that we do not double-accumulate stats if multiple topics refer to the same RG. |
| HashSet<String> RGsWithPublishStatsGathered = new HashSet<>(); |
| HashSet<String> RGsWithDispatchStatsGathered = new HashSet<>(); |
| |
| // Hack to ensure aggregator calculation without waiting for a period of aggregation. |
| // [aggregateResourceGroupLocalUsages() is idempotent when there's no fresh traffic flowing.] |
| this.rgservice.aggregateResourceGroupLocalUsages(); |
| |
| for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) { |
| String mapTopicName = entry.getKey(); |
| if (Arrays.asList(topicStrings).contains(mapTopicName)) { |
| TopicStats stats = entry.getValue(); |
| totalInMessages += stats.getMsgInCounter(); |
| totalInBytes += stats.getBytesInCounter(); |
| totalOutMessages += stats.getMsgOutCounter(); |
| totalOutBytes += stats.getBytesOutCounter(); |
| |
| // Assuming that broker-service stats-gathering is doing its job, |
| // we should see some produced mesgs on every topic. |
| if (totalInMessages == 0) { |
| log.warn("verifyProdConsStats: found no produced mesgs (msgInCounter) on topic {}", mapTopicName); |
| } |
| |
| if (sentNumMsgs > 0 || recvdNumMsgs > 0) { |
| TopicName topic = TopicName.get(mapTopicName); |
| |
| final String tenantRGName = TopicToTenantRGName(topic); |
| if (!RGsWithPublishStatsGathered.contains(tenantRGName)) { |
| prodCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroupMonitoringClass.Publish, |
| getCumulativeUsageStats); |
| totalTenantRGProdCounts = ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, prodCounts); |
| RGsWithPublishStatsGathered.add(tenantRGName); |
| } |
| if (!RGsWithDispatchStatsGathered.contains(tenantRGName)) { |
| consCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroupMonitoringClass.Dispatch, |
| getCumulativeUsageStats); |
| totalTenantRGConsCounts = ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, consCounts); |
| RGsWithDispatchStatsGathered.add(tenantRGName); |
| } |
| |
| final String nsRGName = TopicToNamespaceRGName(topic); |
| // If tenantRGName == nsRGName, the RG-infra will avoid double counting. |
| // We will do the same here, to get the expected stats. |
| if (tenantRGName.compareTo(nsRGName) != 0) { |
| if (!RGsWithPublishStatsGathered.contains(nsRGName)) { |
| prodCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroupMonitoringClass.Publish, |
| getCumulativeUsageStats); |
| totalNsRGProdCounts = ResourceGroup.accumulateBMCount(totalNsRGProdCounts, prodCounts); |
| RGsWithPublishStatsGathered.add(nsRGName); |
| } |
| if (!RGsWithDispatchStatsGathered.contains(nsRGName)) { |
| consCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroupMonitoringClass.Dispatch, |
| getCumulativeUsageStats); |
| totalNsRGConsCounts = ResourceGroup.accumulateBMCount(totalNsRGConsCounts, consCounts); |
| RGsWithDispatchStatsGathered.add(nsRGName); |
| } |
| } |
| } |
| } |
| } |
| |
| // Check that the accumulated totals tally up. |
| if (checkConsume && checkProduce) { |
| Assert.assertEquals(totalOutMessages, totalInMessages); |
| Assert.assertEquals(totalOutBytes, totalInBytes); |
| } |
| |
| if (checkProduce) { |
| Assert.assertEquals(totalInMessages, sentNumMsgs); |
| Assert.assertTrue(totalInBytes >= ExpectedNumBytesSent); |
| } |
| |
| if (checkConsume) { |
| Assert.assertEquals(totalOutMessages, recvdNumMsgs); |
| Assert.assertTrue(totalOutBytes >= ExpectedNumBytesReceived); |
| } |
| |
| if (checkProduce) { |
| prodCounts = ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, totalNsRGProdCounts); |
| Assert.assertEquals(prodCounts.messages, sentNumMsgs * scaleFactor); |
| Assert.assertTrue(prodCounts.bytes >= ExpectedNumBytesSent); |
| } |
| |
| if (checkConsume) { |
| consCounts = ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, totalNsRGConsCounts); |
| Assert.assertEquals(consCounts.messages, recvdNumMsgs * scaleFactor); |
| Assert.assertTrue(consCounts.bytes >= ExpectedNumBytesReceived); |
| } |
| } |
| |
| // Check the metrics for the RGs involved |
| private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs, |
| int recvdNumBytes, int recvdNumMsgs, |
| int scaleFactor, boolean checkProduce, |
| boolean checkConsume) throws Exception { |
| final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs; |
| final int ExpectedNumBytesReceived = recvdNumBytes + PER_MESSAGE_METADATA_OHEAD * recvdNumMsgs; |
| long totalTenantRegisters = 0; |
| long totalTenantUnRegisters = 0; |
| long totalNamespaceRegisters = 0; |
| long totalNamespaceUnRegisters = 0; |
| long[] totalQuotaBytes = new long[ResourceGroupMonitoringClass.values().length]; |
| long[] totalQuotaMessages = new long[ResourceGroupMonitoringClass.values().length]; |
| long[] totalUsedBytes = new long[ResourceGroupMonitoringClass.values().length]; |
| long[] totalUsedMessages = new long[ResourceGroupMonitoringClass.values().length]; |
| long[] totalUsageReportCounts = new long[ResourceGroupMonitoringClass.values().length]; |
| long totalUpdates = 0; |
| |
| // Hack to ensure aggregator calculation without waiting for a period of aggregation. |
| // [aggregateResourceGroupLocalUsages() is idempotent when there's no new traffic flowing.] |
| this.rgservice.aggregateResourceGroupLocalUsages(); |
| |
| for (String rgName : RGNames) { |
| for (ResourceGroupMonitoringClass mc : ResourceGroupMonitoringClass.values()) { |
| String mcName = mc.name(); |
| int mcIndex = mc.ordinal(); |
| double quotaBytes = ResourceGroupService.getRgQuotaByteCount(rgName, mcName); |
| totalQuotaBytes[mcIndex] += quotaBytes; |
| double quotaMesgs = ResourceGroupService.getRgQuotaMessageCount(rgName, mcName); |
| totalQuotaMessages[mcIndex] += quotaMesgs; |
| double usedBytes = ResourceGroupService.getRgLocalUsageByteCount(rgName, mcName); |
| totalUsedBytes[mcIndex] += usedBytes; |
| double usedMesgs = ResourceGroupService.getRgLocalUsageMessageCount(rgName, mcName); |
| totalUsedMessages[mcIndex] += usedMesgs; |
| |
| double usageReportedCount = ResourceGroup.getRgUsageReportedCount(rgName, mcName); |
| totalUsageReportCounts[mcIndex] += usageReportedCount; |
| } |
| |
| totalTenantRegisters += ResourceGroupService.getRgTenantRegistersCount(rgName); |
| totalTenantUnRegisters += ResourceGroupService.getRgTenantUnRegistersCount(rgName); |
| totalNamespaceRegisters += ResourceGroupService.getRgNamespaceRegistersCount(rgName); |
| totalNamespaceUnRegisters += ResourceGroupService.getRgNamespaceUnRegistersCount(rgName); |
| totalUpdates += ResourceGroupService.getRgUpdatesCount(rgName); |
| } |
| log.info("totalTenantRegisters={}, totalTenantUnRegisters={}, " + |
| "totalNamespaceRegisters={}, totalNamespaceUnRegisters={}", |
| totalTenantRegisters, totalTenantUnRegisters, totalNamespaceRegisters, totalNamespaceUnRegisters); |
| |
| // On each run, there will be 'NumRGs' registrations |
| Assert.assertEquals(totalTenantRegisters - residualTenantRegs, NUM_RESOURCE_GROUPS); |
| Assert.assertEquals(totalNamespaceRegisters - residualNamespaceRegs, NUM_RESOURCE_GROUPS); |
| |
| // The unregisters will lag the registers by one round (because verifyRGMetrics() is called |
| // prior to unregister). In other words, their numbers will equal the residuals for the registers. |
| Assert.assertEquals(totalTenantUnRegisters, residualTenantRegs); |
| Assert.assertEquals(totalNamespaceUnRegisters, residualNamespaceRegs); |
| |
| // Update residuals for next test run. |
| residualTenantRegs = totalTenantRegisters; |
| residualNamespaceRegs = totalNamespaceRegisters; |
| |
| for (ResourceGroupMonitoringClass mc : ResourceGroupMonitoringClass.values()) { |
| int mcIdx = mc.ordinal(); |
| log.info("mc={}: totalQuotaBytes={}, totalQuotaMessages={}, " + |
| " totalUsedBytes={}, totalUsedMessages={}" + |
| " totalUsageReports={}", |
| mc.name(), totalQuotaBytes[mcIdx], totalQuotaMessages[mcIdx], |
| totalUsedBytes[mcIdx], totalUsedMessages[mcIdx], totalUsageReportCounts[mcIdx]); |
| // On each run, the bytes/messages are monotone incremented in Prometheus metrics. |
| // So, we take the residuals into account when comparing against the expected. |
| if (checkProduce && mc == ResourceGroupMonitoringClass.Publish) { |
| Assert.assertEquals(totalUsedMessages[mcIdx] - residualSentNumMessages, |
| sentNumMsgs * scaleFactor); |
| Assert.assertTrue(totalUsedBytes[mcIdx] - residualSentNumBytes |
| >= ExpectedNumBytesSent); |
| } else if (checkConsume && mc == ResourceGroupMonitoringClass.Dispatch) { |
| Assert.assertEquals(totalUsedMessages[mcIdx] - residualRecvdNumMessages, |
| recvdNumMsgs * scaleFactor); |
| Assert.assertTrue(totalUsedBytes[mcIdx] - residualRecvdNumBytes |
| >= ExpectedNumBytesReceived); |
| } |
| |
| long perClassUsageReports = numLocalUsageReports / ResourceGroupMonitoringClass.values().length; |
| Assert.assertEquals(totalUsageReportCounts[mcIdx], perClassUsageReports); |
| } |
| |
| // Update the residuals for next round of tests. |
| residualSentNumBytes += sentNumBytes; |
| residualSentNumMessages += sentNumMsgs * scaleFactor; |
| residualRecvdNumBytes += recvdNumBytes; |
| residualRecvdNumMessages += recvdNumMsgs * scaleFactor; |
| |
| Assert.assertEquals(totalUpdates, 0); // currently, we don't update the RGs in this UT |
| |
| // Basic check that latency metrics are doing some work. |
| Summary.Child.Value usageAggrLatency = ResourceGroupService.getRgUsageAggregationLatency(); |
| Assert.assertNotEquals(usageAggrLatency.count, 0); |
| Assert.assertNotEquals(usageAggrLatency.sum, 0); |
| double fiftiethPercentileValue = usageAggrLatency.quantiles.get(0.5); |
| Assert.assertNotEquals(fiftiethPercentileValue, 0); |
| double ninthPercentileValue = usageAggrLatency.quantiles.get(0.9); |
| Assert.assertNotEquals(ninthPercentileValue, 0); |
| |
| Summary.Child.Value quotaCalcLatency = ResourceGroupService.getRgQuotaCalculationTime(); |
| Assert.assertNotEquals(quotaCalcLatency.count, 0); |
| Assert.assertNotEquals(quotaCalcLatency.sum, 0); |
| fiftiethPercentileValue = quotaCalcLatency.quantiles.get(0.5); |
| Assert.assertNotEquals(fiftiethPercentileValue, 0); |
| ninthPercentileValue = quotaCalcLatency.quantiles.get(0.9); |
| Assert.assertNotEquals(ninthPercentileValue, 0); |
| } |
| |
| // Empirically, there appears to be a 45-byte overhead for metadata, imposed by Pulsar runtime. |
| private static final int PER_MESSAGE_METADATA_OHEAD = 45; |
| |
| private static final int PUBLISH_INTERVAL_SECS = 10; |
| private static final int NUM_PRODUCERS = 4; |
| private static final int NUM_CONSUMERS = 4; |
| private static final int NUM_MESSAGES_PER_PRODUCER = 100; |
| private static final int NUM_TOPICS = 8; // Set == NumProducers, so each producer can send on its own topic |
| private static final int NUM_RESOURCE_GROUPS = 4; // arbitrarily, half of NumTopics, so 2 topics map to each RG |
| private static final int NUM_TOTAL_MESSAGES = NUM_MESSAGES_PER_PRODUCER * NUM_PRODUCERS; |
| private static final int NUM_MESSAGES_PER_CONSUMER = NUM_TOTAL_MESSAGES / NUM_CONSUMERS; |
| private final org.apache.pulsar.common.policies.data.ResourceGroup rgConfig = |
| new org.apache.pulsar.common.policies.data.ResourceGroup(); |
| private ResourceGroupService rgservice; |
| |
| private final String clusterName = "test"; |
| private final String BaseRGName = "rg-"; |
| private final String BaseTestTopicName = "rgusage-topic-"; |
| |
| private final String[] RGNames = new String[NUM_RESOURCE_GROUPS]; |
| |
| // The number of times we pretend to have not suppressed sending a local usage report. |
| private long numLocalUsageReports; |
| |
| // Combinations of tenant and namespace required to test RG use cases when both NS and tenant are under the control |
| // of the same RG, vs. cases where they are under the control of distinct RGs. |
| // [This is required to test the special case of "tenant and NS refer to the same RG", because in that case |
| // we don't double-count the usage.] |
| // Same-order mapping: e.g., rg-0/rg-0 (for 0th entry) |
| private final String[] TenantAndNsNameSameOrder = new String[NUM_RESOURCE_GROUPS]; |
| |
| // Opposite order mapping: e.g., rg-0/rg-49 (for 0th entry with 50 RGs) |
| private final String[] TenantAndNsNameOppositeOrder = new String[NUM_RESOURCE_GROUPS]; |
| |
| // Similar to above (same and opposite order) for topics. |
| // E.g., rg-0/rg-0/rgusage-topic0 for 0-th topic in "same order" |
| // and rg-0/rg-49/rgusage-topic0 for 0-th topic in "opposite order", with 50 RGs |
| private final String[] TopicNamesSameTenantAndNsRGs = new String[NUM_TOPICS]; |
| private final String[] TopicNamesDifferentTenantAndNsRGs = new String[NUM_TOPICS]; |
| |
| // Persistent and non-persistent topic strings with the above names. |
| private final String[] PersistentTopicNamesSameTenantAndNsRGs = new String[NUM_TOPICS]; |
| private final String[] PersistentTopicNamesDifferentTenantAndNsRGs = new String[NUM_TOPICS]; |
| private final String[] NonPersistentTopicNamesSameTenantAndNsRGs = new String[NUM_TOPICS]; |
| private final String[] NonPersistentTopicNamesDifferentTenantAndNsRGs = new String[NUM_TOPICS]; |
| |
| |
| // We don't periodically report to a remote broker in this test. So, we will use cumulative stats. |
| private final ResourceGroupUsageStatsType getCumulativeUsageStats = ResourceGroupUsageStatsType.Cumulative; |
| |
| // Keep track of the namespaces that were created, so we don't dup and get exceptions |
| HashSet<String> createdNamespaces = new HashSet<>(); |
| |
| // Keep track of the topics that were created, so we don't dup and get exceptions |
| HashSet<String> createdTopics = new HashSet<>(); |
| |
| // Keep track of the tenants that have been registered to their RGs, so we don't dup and get exceptions |
| HashSet<String> registeredTenants = new HashSet<>(); |
| |
| // Keep track of the namespaces that have been registered to their RGs, so we don't dup and get exceptions |
| HashSet<String> registeredNamespaces = new HashSet<>(); |
| |
| // Prometheus stats are monotonically increasing numbers. |
| // On each run, the resource-group metrics are incremented in Prometheus. |
| // So, we keep some residuals to help isolate/verify "this run's" values. |
| long residualTenantRegs; |
| long residualNamespaceRegs; |
| long residualSentNumBytes; |
| long residualSentNumMessages; |
| long residualRecvdNumBytes; |
| long residualRecvdNumMessages; |
| |
| // Create the topics provided |
| private void createTopics(String[] topics) { |
| BrokerService bs = this.pulsar.getBrokerService(); |
| for (String topic : topics) { |
| if (!createdTopics.contains(topic)) { |
| bs.getOrCreateTopic(topic); |
| createdTopics.add(topic); |
| } |
| } |
| } |
| |
| // Destroy the topics provided |
| private void destroyTopics(String[] topics) { |
| BrokerService bs = this.pulsar.getBrokerService(); |
| for (String topic : topics) { |
| if (!createdTopics.contains(topic)) { |
| bs.deleteTopic(topic, true); |
| createdTopics.remove(topic); |
| } |
| } |
| } |
| |
| // Create all the RGs named in RGNames[] |
| private void createRGs() throws Exception { |
| for (String rgname : RGNames) { |
| this.rgservice.resourceGroupCreate(rgname, rgConfig); |
| } |
| } |
| |
| // Destroy all the RGs named in RGNames[] |
| private void destroyRGs() throws Exception { |
| for (String rgname : RGNames) { |
| this.rgservice.resourceGroupDelete(rgname); |
| } |
| } |
| |
| // Initial set up for transport manager and cluster creation. |
| private void prepareForOps() throws PulsarAdminException { |
| this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS); |
| this.conf.setAllowAutoTopicCreation(true); |
| admin.clusters().createCluster(clusterName, ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); |
| } |
| |
| // Set up of RG/tenant/namespaces/topic names, and checking of the test parameters. |
| private void prepareRGs() throws Exception { |
| // Check for a few invariants which allow easier mapping of structures in the test. |
| |
| // Ensure that the number of consumers is a multiple of the number of producers. |
| Assert.assertTrue(NUM_CONSUMERS >= NUM_PRODUCERS && NUM_CONSUMERS % NUM_PRODUCERS == 0); |
| |
| // Number of messages is a multiple of the number of topics. |
| Assert.assertEquals(NUM_TOTAL_MESSAGES % NUM_TOPICS, 0); |
| |
| // Ensure that the number of topics is a multiple of the number of RGs. |
| Assert.assertEquals(NUM_TOPICS % NUM_RESOURCE_GROUPS, 0); |
| |
| // Ensure that the messages-per-consumer is an integral multiple of the number of consumers. |
| final int NumConsumerMessages = NUM_MESSAGES_PER_CONSUMER * NUM_CONSUMERS; |
| final int NumProducerMessages = NUM_MESSAGES_PER_PRODUCER * NUM_PRODUCERS; |
| Assert.assertTrue(NUM_MESSAGES_PER_CONSUMER > 0 && NumConsumerMessages == NumProducerMessages); |
| |
| rgConfig.setPublishRateInBytes(1500L); |
| rgConfig.setPublishRateInMsgs(100); |
| rgConfig.setDispatchRateInBytes(4000L); |
| rgConfig.setDispatchRateInMsgs(500); |
| |
| // Set up the RG names; creation of RGs will be done elsewhere. |
| for (int ix = 0; ix < NUM_RESOURCE_GROUPS; ix++) { |
| RGNames[ix] = BaseRGName + ix; |
| } |
| |
| // Create all the tenants |
| final TenantInfo configInfo = |
| new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), Sets.newHashSet(clusterName)); |
| for (int ix = 0; ix < NUM_RESOURCE_GROUPS; ix++) { |
| admin.tenants().createTenant(RGNames[ix], configInfo); |
| } |
| |
| // Set up the tenant-and-nsname mapping strings, for same and opposite order of RGs. |
| for (int ix = 0; ix < NUM_RESOURCE_GROUPS; ix++) { |
| TenantAndNsNameSameOrder[ix] = RGNames[ix] + "/" + RGNames[ix]; |
| TenantAndNsNameOppositeOrder[ix] = RGNames[ix] + "/" + RGNames[NUM_RESOURCE_GROUPS - (ix + 1)]; |
| } |
| |
| // Create all the namespaces |
| for (int ix = 0; ix < NUM_RESOURCE_GROUPS; ix++) { |
| if (!createdNamespaces.contains(TenantAndNsNameSameOrder[ix])) { |
| admin.namespaces().createNamespace(TenantAndNsNameSameOrder[ix]); |
| admin.namespaces().setNamespaceReplicationClusters( |
| TenantAndNsNameSameOrder[ix], Sets.newHashSet(clusterName)); |
| createdNamespaces.add(TenantAndNsNameSameOrder[ix]); |
| } |
| |
| if (!createdNamespaces.contains(TenantAndNsNameOppositeOrder[ix])) { |
| admin.namespaces().createNamespace(TenantAndNsNameOppositeOrder[ix]); |
| admin.namespaces().setNamespaceReplicationClusters( |
| TenantAndNsNameOppositeOrder[ix], Sets.newHashSet(clusterName)); |
| createdNamespaces.add(TenantAndNsNameOppositeOrder[ix]); |
| } |
| } |
| |
| // Create all the topic name strings |
| for (int ix = 0; ix < NUM_TOPICS; ix++) { |
| TopicNamesSameTenantAndNsRGs[ix] = |
| TenantAndNsNameSameOrder[ix % NUM_RESOURCE_GROUPS] + "/" + BaseTestTopicName + ix; |
| TopicNamesDifferentTenantAndNsRGs[ix] = |
| TenantAndNsNameOppositeOrder[ix % NUM_RESOURCE_GROUPS] + "/" + BaseTestTopicName + ix; |
| } |
| |
| // Create all the persistent and non-persistent topic strings |
| for (int ix = 0; ix < NUM_TOPICS; ix++) { |
| PersistentTopicNamesSameTenantAndNsRGs[ix] = |
| "persistent://" + TopicNamesSameTenantAndNsRGs[ix]; |
| PersistentTopicNamesDifferentTenantAndNsRGs[ix] = |
| "persistent://" + TopicNamesDifferentTenantAndNsRGs[ix]; |
| |
| NonPersistentTopicNamesSameTenantAndNsRGs[ix] = |
| "non-persistent://" + TopicNamesSameTenantAndNsRGs[ix]; |
| NonPersistentTopicNamesDifferentTenantAndNsRGs[ix] = |
| "non-persistent://" + TopicNamesDifferentTenantAndNsRGs[ix]; |
| } |
| } |
| } |