[PIP-82] [pulsar-broker] Add a task to publish resource usage to a topic (#10008)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index bbb6896..856f826 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -680,6 +680,26 @@
+ " non-backlog consumers as well.")
private boolean dispatchThrottlingOnNonBacklogConsumerEnabled = false;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Default policy for publishing usage reports to system topic is disabled."
+ + "This enables publishing of usage reports"
+ )
+ private String resourceUsageTransportClassName = "";
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Topic to publish usage reports to if resourceUsagePublishToTopic is enabled."
+ )
+ private String resourceUsageTransportPublishTopicName = "non-persistent://pulsar/system/resource-usage";
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Default interval to publish usage reports if resourceUsagePublishToTopic is enabled."
+ )
+ private int resourceUsageTransportPublishIntervalInSecs = 60;
+
// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 3bcdd82..49609c2 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -472,6 +472,7 @@
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
<checkStaleness>true</checkStaleness>
+ <excludes>**/ResourceUsage.proto</excludes>
</configuration>
<executions>
<execution>
@@ -483,6 +484,22 @@
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>com.github.splunk.lightproto</groupId>
+ <artifactId>lightproto-maven-plugin</artifactId>
+ <version>${lightproto-maven-plugin.version}</version>
+ <configuration>
+ <sources>${project.basedir}/src/main/proto/ResourceUsage.proto</sources>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<resources>
<resource>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aac8755..7235aef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -30,6 +30,7 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -85,6 +86,7 @@
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
+import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
@@ -177,6 +179,7 @@
private GlobalZooKeeperCache globalZkCache;
private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
private Compactor compactor;
+ private ResourceUsageTransportManager resourceUsageTransportManager;
private final ScheduledExecutorService executor;
private final ScheduledExecutorService cacheExecutor;
@@ -732,6 +735,15 @@
this.startPackagesManagementService();
}
+ // Start the task to publish resource usage, if necessary
+ if (config.getResourceUsageTransportClassName() != null
+ && config.getResourceUsageTransportClassName() != "") {
+ Class<?> clazz = Class.forName(config.getResourceUsageTransportClassName());
+ Constructor<?> ctor = clazz.getConstructor(PulsarService.class);
+ Object object = ctor.newInstance(new Object[] { this });
+ this.resourceUsageTransportManager = (ResourceUsageTransportManager) object;
+ }
+
final String bootstrapMessage = "bootstrap service "
+ (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "")
+ (config.getWebServicePortTls().isPresent() ? ", tls-port = " + config.getWebServicePortTls() : "")
@@ -1368,6 +1380,10 @@
return topicPoliciesService;
}
+ public ResourceUsageTransportManager getResourceUsageTransportManager() {
+ return resourceUsageTransportManager;
+ }
+
public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
if (metricsServlet == null) {
if (pendingMetricsProviders == null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageConsumer.java
new file mode 100644
index 0000000..a8be076
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageConsumer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
+
+/*
+ * Interface that a resource owner (tenant, namespace or topic) needs to implement.
+ */
+public interface ResourceUsageConsumer {
+ /*
+ * Get the unique identifier for the owner
+ * @return return the owner ID
+ */
+ String getID();
+
+ /*
+ * Listener for resource usage published by other brokers
+ * @param broker name
+ * @param Resource usage object
+ */
+ void acceptResourceUsage(String broker, ResourceUsage resourceUsage);
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsagePublisher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsagePublisher.java
new file mode 100644
index 0000000..7ae0bab
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsagePublisher.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
+
+/*
+ * Interface that a resource owner (tenant, namespace or topic) needs to implement.
+ */
+public interface ResourceUsagePublisher {
+ /*
+ * Get the unique identifier for the owner
+ * @return return the owner ID
+ */
+ String getID();
+
+ /*
+ * Fill the resource usage object that is passed in
+ * @param Resource Usage object
+ */
+ void fillResourceUsage(ResourceUsage resourceUsage);
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
new file mode 100644
index 0000000..f7064e4
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManager.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.apache.pulsar.client.api.CompressionType.SNAPPY;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsageInfo;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resource Usage Transport Manager
+ *
+ * <P>Module to exchange usage information with other brokers. Implements a task to periodically.
+ * <P>publish the usage as well as handlers to process the usage info from other brokers.
+ *
+ * @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
+ *
+ */
+public class ResourceUsageTransportManager implements AutoCloseable {
+
+ private class ResourceUsageWriterTask implements Runnable, AutoCloseable {
+ private final Producer<byte[]> producer;
+ private final ScheduledFuture<?> resourceUsagePublishTask;
+
+ private Producer<byte[]> createProducer() throws PulsarClientException {
+ final int publishDelayMilliSecs = 10;
+ final int sendTimeoutSecs = 10;
+
+ return pulsarClient.newProducer()
+ .topic(pulsarService.getConfig().getResourceUsageTransportPublishTopicName())
+ .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS)
+ .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS)
+ .blockIfQueueFull(false)
+ .compressionType(SNAPPY)
+ .create();
+ }
+
+ public ResourceUsageWriterTask() throws PulsarClientException {
+ producer = createProducer();
+ resourceUsagePublishTask = pulsarService.getExecutor().scheduleAtFixedRate(
+ this,
+ pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(),
+ pulsarService.getConfig().getResourceUsageTransportPublishIntervalInSecs(),
+ TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void run() {
+ if (!publisherMap.isEmpty()) {
+ ResourceUsageInfo rUsageInfo = new ResourceUsageInfo();
+ rUsageInfo.setBroker(pulsarService.getBrokerServiceUrl());
+
+ publisherMap.forEach((key, item) -> item.fillResourceUsage(rUsageInfo.addUsageMap()));
+
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.heapBuffer(rUsageInfo.getSerializedSize());
+ rUsageInfo.writeTo(buf);
+
+ byte[] bytes = buf.array();
+ producer.sendAsync(bytes).whenComplete((id, ex) -> {
+ if (null != ex) {
+ LOG.error("Resource usage publisher: sending message ID {} error {}", id, ex);
+ }
+ buf.release();
+ });
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ resourceUsagePublishTask.cancel(true);
+ producer.close();
+ }
+ }
+
+ private class ResourceUsageReader implements ReaderListener<byte[]>, AutoCloseable {
+ private final ResourceUsageInfo recdUsageInfo = new ResourceUsageInfo();
+ private final Reader<byte[]> consumer;
+
+ public ResourceUsageReader() throws PulsarClientException {
+ consumer = pulsarClient.newReader()
+ .topic(pulsarService.getConfig().getResourceUsageTransportPublishTopicName())
+ .startMessageId(MessageId.latest)
+ .readerListener(this)
+ .create();
+ }
+
+ @Override
+ public void close() throws Exception {
+ consumer.close();
+ }
+
+ @Override
+ public void received(Reader<byte[]> reader, Message<byte[]> msg) {
+ try {
+ recdUsageInfo.parseFrom(Unpooled.wrappedBuffer(msg.getData()), msg.getData().length);
+
+ recdUsageInfo.getUsageMapsList().forEach(ru -> {
+ ResourceUsageConsumer owner = consumerMap.get(ru.getOwner());
+ if (owner != null) {
+ owner.acceptResourceUsage(recdUsageInfo.getBroker(), ru);
+ }
+ });
+
+ } catch (IllegalStateException exception) {
+ LOG.error("Resource usage reader: Error parsing incoming message {}", exception);
+ } catch (Exception exception) {
+ LOG.error("Resource usage reader: Unknown exception while parsing message {}", exception);
+ }
+ }
+
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTransportManager.class);
+ private final PulsarService pulsarService;
+ private final PulsarClient pulsarClient;
+ private final ResourceUsageWriterTask pTask;
+ private final ResourceUsageReader consumer;
+ private final Map<String, ResourceUsagePublisher>
+ publisherMap = new ConcurrentHashMap<String, ResourceUsagePublisher>();
+ private final Map<String, ResourceUsageConsumer>
+ consumerMap = new ConcurrentHashMap<String, ResourceUsageConsumer>();
+
+ private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException {
+ // Create a public tenant and default namespace
+ TopicName topicName = TopicName.get(pulsarService.getConfig().getResourceUsageTransportPublishTopicName());
+
+ PulsarAdmin admin = pulsarService.getAdminClient();
+ ServiceConfiguration config = pulsarService.getConfig();
+ String cluster = config.getClusterName();
+
+ final String tenant = topicName.getTenant();
+ final String namespace = topicName.getNamespace();
+
+ List<String> tenantList = admin.tenants().getTenants();
+ if (!tenantList.contains(tenant)) {
+ admin.tenants().createTenant(tenant,
+ new TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
+ }
+ List<String> nsList = admin.namespaces().getNamespaces(tenant);
+ if (!nsList.contains(namespace)) {
+ admin.namespaces().createNamespace(namespace);
+ }
+ }
+
+ public ResourceUsageTransportManager(PulsarService pulsarService) throws Exception {
+ this.pulsarService = pulsarService;
+ this.pulsarClient = pulsarService.getClient();
+
+ try {
+ createTenantAndNamespace();
+ consumer = new ResourceUsageReader();
+ pTask = new ResourceUsageWriterTask();
+ } catch (Exception ex) {
+ LOG.error("Error initializing resource usage transport manager: {}", ex);
+ throw ex;
+ }
+ }
+
+ /*
+ * Register a resource owner (resource-group, tenant, namespace, topic etc).
+ *
+ * @param resource usage publisher
+ */
+ public void registerResourceUsagePublisher(ResourceUsagePublisher r) {
+ publisherMap.put(r.getID(), r);
+ }
+
+ /*
+ * Unregister a resource owner (resource-group, tenant, namespace, topic etc).
+ *
+ * @param resource usage publisher
+ */
+ public void unregisterResourceUsageProducer(ResourceUsagePublisher r) {
+ publisherMap.remove(r.getID());
+ }
+
+ /*
+ * Register a resource owner (resource-group, tenant, namespace, topic etc).
+ *
+ * @param resource usage consumer
+ */
+ public void registerResourceUsageConsumer(ResourceUsageConsumer r) {
+ consumerMap.put(r.getID(), r);
+ }
+
+ /*
+ * Unregister a resource owner (resource-group, tenant, namespace, topic etc).
+ *
+ * @param resource usage consumer
+ */
+ public void unregisterResourceUsageConsumer(ResourceUsageConsumer r) {
+ consumerMap.remove(r.getID());
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ pTask.close();
+ consumer.close();
+ } catch (Exception ex1) {
+ LOG.error("Error closing producer/consumer for resource-usage topic {}", ex1);
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/package-info.java
new file mode 100644
index 0000000..ba7ee9e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
\ No newline at end of file
diff --git a/pulsar-broker/src/main/proto/ResourceUsage.proto b/pulsar-broker/src/main/proto/ResourceUsage.proto
new file mode 100644
index 0000000..4706c9d
--- /dev/null
+++ b/pulsar-broker/src/main/proto/ResourceUsage.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.resource.usage;
+option java_package = "org.apache.pulsar.broker.service.resource.usage";
+option optimize_for = SPEED;
+
+message NetworkUsage {
+ required uint64 bytesPerPeriod = 1;
+ required uint64 messagesPerPeriod = 2;
+}
+
+message StorageUsage {
+ required uint64 totalBytes = 1;
+}
+
+message ResourceUsage {
+ // owner is the key(ID) of the entity that reports the usage
+ // It could be a resource-group or tenant, namespace or a topic
+ required string owner = 1;
+ optional NetworkUsage publish = 2;
+ optional NetworkUsage dispatch = 3;
+ optional StorageUsage storage = 4;
+}
+
+message ResourceUsageInfo {
+ // Name of the broker
+ required string broker = 1;
+
+ repeated ResourceUsage usageMap = 2;
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
new file mode 100644
index 0000000..4c1f186
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
+import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
+
+public class ResourceUsageTransportManagerTest extends MockedPulsarServiceBaseTest {
+
+ private static final String INTERNAL_TOPIC = "non-persistent://pulsar-test/test/resource-usage";
+ private static final int PUBLISH_INTERVAL_SECS = 1;
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ prepareData();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testNamespaceCreation() throws Exception {
+ ResourceUsageTransportManager tManager = new ResourceUsageTransportManager(pulsar);
+ TopicName topicName = TopicName.get(INTERNAL_TOPIC);
+
+ assertTrue(admin.tenants().getTenants().contains(topicName.getTenant()));
+ assertTrue(admin.namespaces().getNamespaces(topicName.getTenant()).contains(topicName.getNamespace()));
+
+ }
+
+ @Test
+ public void testPublish() throws Exception {
+ ResourceUsageTransportManager tManager = new ResourceUsageTransportManager(pulsar);
+ ResourceUsage recvdUsage = new ResourceUsage();
+ final String[] recvdBroker = new String[1];
+
+ ResourceUsagePublisher p = new ResourceUsagePublisher() {
+
+ @Override
+ public String getID() {
+ return "resource-group1";
+ }
+
+ @Override
+ public void fillResourceUsage(ResourceUsage resourceUsage) {
+
+ resourceUsage.setOwner(getID());
+ resourceUsage.setPublish().setMessagesPerPeriod(1000).setBytesPerPeriod(10001);
+ resourceUsage.setStorage().setTotalBytes(500003);
+
+ }
+ };
+
+ ResourceUsageConsumer c = new ResourceUsageConsumer() {
+ @Override
+ public String getID() {
+ return "resource-group1";
+ }
+
+ @Override
+ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
+
+ recvdBroker[0] = broker;
+ recvdUsage.setOwner(resourceUsage.getOwner());
+ NetworkUsage p = recvdUsage.setPublish();
+ p.setBytesPerPeriod(resourceUsage.getPublish().getBytesPerPeriod());
+ p.setMessagesPerPeriod(resourceUsage.getPublish().getMessagesPerPeriod());
+
+ recvdUsage.setStorage().setTotalBytes(resourceUsage.getStorage().getTotalBytes());
+ }
+ };
+
+ tManager.registerResourceUsagePublisher(p);
+ tManager.registerResourceUsageConsumer(c);
+
+ Thread.sleep((PUBLISH_INTERVAL_SECS + 1) * 1000);
+
+ assertEquals(recvdBroker[0], pulsar.getBrokerServiceUrl());
+ assertNotNull(recvdUsage.getPublish());
+ assertNotNull(recvdUsage.getStorage());
+ assertEquals(recvdUsage.getPublish().getBytesPerPeriod(), 10001);
+ assertEquals(recvdUsage.getStorage().getTotalBytes(), 500003);
+ }
+
+ private void prepareData() throws PulsarAdminException {
+ this.conf.setResourceUsageTransportClassName("org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager");
+ this.conf.setResourceUsageTransportPublishTopicName(INTERNAL_TOPIC);
+ this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
+ admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl()));
+ }
+}
\ No newline at end of file